diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply.go b/cmd/atecontroller/internal/controllers/workerpool_apply.go new file mode 100644 index 000000000..e9ce1f7a1 --- /dev/null +++ b/cmd/atecontroller/internal/controllers/workerpool_apply.go @@ -0,0 +1,184 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + corev1 "k8s.io/api/core/v1" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" + + "github.com/agent-substrate/substrate/internal/ateompath" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" +) + +// buildDeploymentApplyConfig constructs the SSA apply configuration for the +// Deployment managed by a WorkerPool. Only fields owned by this controller +// are declared here. +func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.DeploymentApplyConfiguration { + containerAC := corev1ac.Container(). + WithName("ateom"). + WithImage(wp.Spec.AteomImage). + WithArgs( + "--pod-uid=$(POD_UID)", + ). + WithSecurityContext(corev1ac.SecurityContext(). + WithPrivileged(true). + WithRunAsUser(0). + WithRunAsGroup(0)). + WithEnv( + corev1ac.EnvVar(). + WithName("POD_UID"). + WithValueFrom(corev1ac.EnvVarSource(). + WithFieldRef(corev1ac.ObjectFieldSelector(). + WithFieldPath("metadata.uid"))), + ). + WithVolumeMounts(corev1ac.VolumeMount(). + WithName("run-ateom"). + WithMountPath(ateompath.BasePath)) + + podSpecAC := corev1ac.PodSpec(). + WithSecurityContext(corev1ac.PodSecurityContext(). + WithRunAsUser(0). + WithRunAsGroup(0)). + WithVolumes(corev1ac.Volume(). + WithName("run-ateom"). + WithHostPath(corev1ac.HostPathVolumeSource(). + WithPath(ateompath.BasePath). + WithType(corev1.HostPathDirectoryOrCreate))) + + applyWorkerPoolPodTemplate(podSpecAC, wp.Spec.Template) + podSpecAC.WithContainers(containerAC) + + return appsv1ac.Deployment(deploymentName(wp.Name), wp.Namespace). + WithOwnerReferences(metav1ac.OwnerReference(). + WithAPIVersion(atev1alpha1.GroupVersion.String()). + WithKind("WorkerPool"). + WithName(wp.Name). + WithUID(wp.UID). + WithController(true). + WithBlockOwnerDeletion(true)). + WithSpec(appsv1ac.DeploymentSpec(). + WithReplicas(wp.Spec.Replicas). + WithSelector(metav1ac.LabelSelector(). + WithMatchLabels(map[string]string{"ate.dev/worker-pool": wp.Name})). + WithTemplate(corev1ac.PodTemplateSpec(). + WithLabels(map[string]string{ + "ate.dev/worker-pool": wp.Name, + }). + WithSpec(podSpecAC))) +} + +func applyWorkerPoolPodTemplate( + podSpecAC *corev1ac.PodSpecApplyConfiguration, + tmpl *atev1alpha1.WorkerPoolPodTemplate, +) { + podSpecAC.NodeSelector = map[string]string{} + podSpecAC.Tolerations = []corev1ac.TolerationApplyConfiguration{} + podSpecAC.WithPriorityClassName("") + podSpecAC.WithAffinity(corev1ac.Affinity()) + + if tmpl == nil { + return + } + + if tmpl.NodeSelector != nil { + podSpecAC.WithNodeSelector(tmpl.NodeSelector) + } + podSpecAC.Tolerations = tolerationApplyValues(tolerationsToApply(tmpl.Tolerations)) + podSpecAC.WithPriorityClassName(tmpl.PriorityClassName) + + if tmpl.NodeAffinity != nil { + podSpecAC.WithAffinity(corev1ac.Affinity().WithNodeAffinity(nodeAffinityToApply(tmpl.NodeAffinity))) + } +} + +func tolerationApplyValues(tolerations []*corev1ac.TolerationApplyConfiguration) []corev1ac.TolerationApplyConfiguration { + out := make([]corev1ac.TolerationApplyConfiguration, 0, len(tolerations)) + for _, toleration := range tolerations { + out = append(out, *toleration) + } + return out +} + +func tolerationsToApply(tolerations []corev1.Toleration) []*corev1ac.TolerationApplyConfiguration { + out := make([]*corev1ac.TolerationApplyConfiguration, 0, len(tolerations)) + for i := range tolerations { + t := &tolerations[i] + ac := corev1ac.Toleration() + if t.Key != "" { + ac.WithKey(t.Key) + } + if t.Operator != "" { + ac.WithOperator(t.Operator) + } + if t.Value != "" { + ac.WithValue(t.Value) + } + if t.Effect != "" { + ac.WithEffect(t.Effect) + } + if t.TolerationSeconds != nil { + ac.WithTolerationSeconds(*t.TolerationSeconds) + } + out = append(out, ac) + } + return out +} + +func nodeAffinityToApply(na *corev1.NodeAffinity) *corev1ac.NodeAffinityApplyConfiguration { + ac := corev1ac.NodeAffinity() + if na.RequiredDuringSchedulingIgnoredDuringExecution != nil { + ac.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelectorToApply(na.RequiredDuringSchedulingIgnoredDuringExecution)) + } + for i := range na.PreferredDuringSchedulingIgnoredDuringExecution { + term := &na.PreferredDuringSchedulingIgnoredDuringExecution[i] + ac.WithPreferredDuringSchedulingIgnoredDuringExecution(preferredSchedulingTermToApply(term)) + } + return ac +} + +func nodeSelectorToApply(ns *corev1.NodeSelector) *corev1ac.NodeSelectorApplyConfiguration { + ac := corev1ac.NodeSelector() + for i := range ns.NodeSelectorTerms { + ac.WithNodeSelectorTerms(nodeSelectorTermToApply(&ns.NodeSelectorTerms[i])) + } + return ac +} + +func preferredSchedulingTermToApply(term *corev1.PreferredSchedulingTerm) *corev1ac.PreferredSchedulingTermApplyConfiguration { + return corev1ac.PreferredSchedulingTerm(). + WithWeight(term.Weight). + WithPreference(nodeSelectorTermToApply(&term.Preference)) +} + +func nodeSelectorTermToApply(term *corev1.NodeSelectorTerm) *corev1ac.NodeSelectorTermApplyConfiguration { + ac := corev1ac.NodeSelectorTerm() + for i := range term.MatchExpressions { + ac.WithMatchExpressions(nodeSelectorRequirementToApply(&term.MatchExpressions[i])) + } + for i := range term.MatchFields { + ac.WithMatchFields(nodeSelectorRequirementToApply(&term.MatchFields[i])) + } + return ac +} + +func nodeSelectorRequirementToApply(req *corev1.NodeSelectorRequirement) *corev1ac.NodeSelectorRequirementApplyConfiguration { + ac := corev1ac.NodeSelectorRequirement().WithKey(req.Key).WithOperator(req.Operator) + if len(req.Values) > 0 { + ac.WithValues(req.Values...) + } + return ac +} diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply_test.go b/cmd/atecontroller/internal/controllers/workerpool_apply_test.go new file mode 100644 index 000000000..5d1ccda80 --- /dev/null +++ b/cmd/atecontroller/internal/controllers/workerpool_apply_test.go @@ -0,0 +1,248 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" + + "github.com/agent-substrate/substrate/internal/ateompath" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" +) + +func TestBuildDeploymentApplyConfig(t *testing.T) { + requiredNodeAffinity := &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{{ + MatchExpressions: []corev1.NodeSelectorRequirement{{ + Key: "workload", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"substrate"}, + }}, + }}, + }, + } + preferredNodeAffinity := &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{{ + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{{ + Key: "disk", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }}, + }, + }}, + } + tolerationSeconds := int64(300) + toleration := corev1.Toleration{ + Key: "dedicated", + Operator: corev1.TolerationOpEqual, + Value: "workerpool", + Effect: corev1.TaintEffectNoSchedule, + TolerationSeconds: &tolerationSeconds, + } + + tests := []struct { + name string + wp *atev1alpha1.WorkerPool + want *appsv1ac.DeploymentApplyConfiguration + }{ + { + name: "default workerpool", + wp: testWorkerPoolApplyConfig(nil), + want: expectedDeploymentApplyConfig(nil), + }, + { + name: "with node selector", + wp: testWorkerPoolApplyConfig(&atev1alpha1.WorkerPoolPodTemplate{ + NodeSelector: map[string]string{ + "accelerator": "gpu", + "topology": "high-mem", + }, + }), + want: expectedDeploymentApplyConfig(func(podSpecAC *corev1ac.PodSpecApplyConfiguration) { + podSpecAC.WithNodeSelector(map[string]string{ + "accelerator": "gpu", + "topology": "high-mem", + }) + }), + }, + { + name: "with tolerations", + wp: testWorkerPoolApplyConfig(&atev1alpha1.WorkerPoolPodTemplate{ + Tolerations: []corev1.Toleration{toleration}, + }), + want: expectedDeploymentApplyConfig(func(podSpecAC *corev1ac.PodSpecApplyConfiguration) { + podSpecAC.Tolerations = []corev1ac.TolerationApplyConfiguration{ + *corev1ac.Toleration(). + WithKey("dedicated"). + WithOperator(corev1.TolerationOpEqual). + WithValue("workerpool"). + WithEffect(corev1.TaintEffectNoSchedule). + WithTolerationSeconds(300), + } + }), + }, + { + name: "with node affinity", + wp: testWorkerPoolApplyConfig(&atev1alpha1.WorkerPoolPodTemplate{ + NodeAffinity: requiredNodeAffinity, + }), + want: expectedDeploymentApplyConfig(func(podSpecAC *corev1ac.PodSpecApplyConfiguration) { + podSpecAC.WithAffinity(corev1ac.Affinity().WithNodeAffinity( + corev1ac.NodeAffinity().WithRequiredDuringSchedulingIgnoredDuringExecution( + corev1ac.NodeSelector().WithNodeSelectorTerms( + corev1ac.NodeSelectorTerm().WithMatchExpressions( + corev1ac.NodeSelectorRequirement(). + WithKey("workload"). + WithOperator(corev1.NodeSelectorOpIn). + WithValues("substrate"), + ), + ), + ), + )) + }), + }, + { + name: "with priority class name", + wp: testWorkerPoolApplyConfig(&atev1alpha1.WorkerPoolPodTemplate{ + PriorityClassName: "interactive-workerpool", + }), + want: expectedDeploymentApplyConfig(func(podSpecAC *corev1ac.PodSpecApplyConfiguration) { + podSpecAC.WithPriorityClassName("interactive-workerpool") + }), + }, + { + name: "with combined scheduling fields", + wp: testWorkerPoolApplyConfig(&atev1alpha1.WorkerPoolPodTemplate{ + NodeSelector: map[string]string{ + "accelerator": "gpu", + "topology": "high-mem", + }, + Tolerations: []corev1.Toleration{toleration}, + PriorityClassName: "interactive-workerpool", + NodeAffinity: preferredNodeAffinity, + }), + want: expectedDeploymentApplyConfig(func(podSpecAC *corev1ac.PodSpecApplyConfiguration) { + podSpecAC.WithNodeSelector(map[string]string{ + "accelerator": "gpu", + "topology": "high-mem", + }) + podSpecAC.Tolerations = []corev1ac.TolerationApplyConfiguration{ + *corev1ac.Toleration(). + WithKey("dedicated"). + WithOperator(corev1.TolerationOpEqual). + WithValue("workerpool"). + WithEffect(corev1.TaintEffectNoSchedule). + WithTolerationSeconds(300), + } + podSpecAC.WithPriorityClassName("interactive-workerpool") + podSpecAC.WithAffinity(corev1ac.Affinity().WithNodeAffinity( + corev1ac.NodeAffinity().WithPreferredDuringSchedulingIgnoredDuringExecution( + corev1ac.PreferredSchedulingTerm(). + WithWeight(50). + WithPreference(corev1ac.NodeSelectorTerm().WithMatchExpressions( + corev1ac.NodeSelectorRequirement(). + WithKey("disk"). + WithOperator(corev1.NodeSelectorOpIn). + WithValues("ssd"), + )), + ), + )) + }), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := buildDeploymentApplyConfig(tt.wp) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Fatalf("buildDeploymentApplyConfig() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func testWorkerPoolApplyConfig(tmpl *atev1alpha1.WorkerPoolPodTemplate) *atev1alpha1.WorkerPool { + return &atev1alpha1.WorkerPool{ + ObjectMeta: metav1.ObjectMeta{Name: "pool", Namespace: "default", UID: "uid"}, + Spec: atev1alpha1.WorkerPoolSpec{ + Replicas: 2, + AteomImage: "ateom:v1", + Template: tmpl, + }, + } +} + +func expectedDeploymentApplyConfig(mutatePodSpec func(*corev1ac.PodSpecApplyConfiguration)) *appsv1ac.DeploymentApplyConfiguration { + wp := testWorkerPoolApplyConfig(nil) + + podSpecAC := corev1ac.PodSpec(). + WithSecurityContext(corev1ac.PodSecurityContext(). + WithRunAsUser(0). + WithRunAsGroup(0)). + WithVolumes(corev1ac.Volume(). + WithName("run-ateom"). + WithHostPath(corev1ac.HostPathVolumeSource(). + WithPath(ateompath.BasePath). + WithType(corev1.HostPathDirectoryOrCreate))). + WithContainers(corev1ac.Container(). + WithName("ateom"). + WithImage(wp.Spec.AteomImage). + WithArgs("--pod-uid=$(POD_UID)"). + WithSecurityContext(corev1ac.SecurityContext(). + WithPrivileged(true). + WithRunAsUser(0). + WithRunAsGroup(0)). + WithEnv(corev1ac.EnvVar(). + WithName("POD_UID"). + WithValueFrom(corev1ac.EnvVarSource(). + WithFieldRef(corev1ac.ObjectFieldSelector(). + WithFieldPath("metadata.uid")))). + WithVolumeMounts(corev1ac.VolumeMount(). + WithName("run-ateom"). + WithMountPath(ateompath.BasePath))) + + podSpecAC.NodeSelector = map[string]string{} + podSpecAC.Tolerations = []corev1ac.TolerationApplyConfiguration{} + podSpecAC.WithPriorityClassName("") + podSpecAC.WithAffinity(corev1ac.Affinity()) + if mutatePodSpec != nil { + mutatePodSpec(podSpecAC) + } + + return appsv1ac.Deployment(deploymentName(wp.Name), wp.Namespace). + WithOwnerReferences(metav1ac.OwnerReference(). + WithAPIVersion(atev1alpha1.GroupVersion.String()). + WithKind("WorkerPool"). + WithName(wp.Name). + WithUID(wp.UID). + WithController(true). + WithBlockOwnerDeletion(true)). + WithSpec(appsv1ac.DeploymentSpec(). + WithReplicas(wp.Spec.Replicas). + WithSelector(metav1ac.LabelSelector(). + WithMatchLabels(map[string]string{"ate.dev/worker-pool": wp.Name})). + WithTemplate(corev1ac.PodTemplateSpec(). + WithLabels(map[string]string{"ate.dev/worker-pool": wp.Name}). + WithSpec(podSpecAC))) +} diff --git a/cmd/atecontroller/internal/controllers/workerpool_controller.go b/cmd/atecontroller/internal/controllers/workerpool_controller.go index e72d2befd..9197563c2 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_controller.go +++ b/cmd/atecontroller/internal/controllers/workerpool_controller.go @@ -19,19 +19,14 @@ import ( "fmt" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" - corev1ac "k8s.io/client-go/applyconfigurations/core/v1" - metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/agent-substrate/substrate/internal/ateompath" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" ) @@ -116,57 +111,6 @@ func (r *WorkerPoolReconciler) syncStatus(ctx context.Context, wp *atev1alpha1.W return nil } -// buildDeploymentApplyConfig constructs the SSA apply configuration for the -// Deployment managed by a WorkerPool. Only fields owned by this controller -// are declared here. -func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.DeploymentApplyConfiguration { - return appsv1ac.Deployment(deploymentName(wp.Name), wp.Namespace). - WithOwnerReferences(metav1ac.OwnerReference(). - WithAPIVersion(atev1alpha1.GroupVersion.String()). - WithKind("WorkerPool"). - WithName(wp.Name). - WithUID(wp.UID). - WithController(true). - WithBlockOwnerDeletion(true)). - WithSpec(appsv1ac.DeploymentSpec(). - WithReplicas(wp.Spec.Replicas). - WithSelector(metav1ac.LabelSelector(). - WithMatchLabels(map[string]string{"ate.dev/worker-pool": wp.Name})). - WithTemplate(corev1ac.PodTemplateSpec(). - WithLabels(map[string]string{ - "ate.dev/worker-pool": wp.Name, - }). - WithSpec(corev1ac.PodSpec(). - WithContainers(corev1ac.Container(). - WithName("ateom"). - WithImage(wp.Spec.AteomImage). - WithArgs( - "--pod-uid=$(POD_UID)", - ). - WithSecurityContext(corev1ac.SecurityContext(). - WithPrivileged(true). - WithRunAsUser(0). - WithRunAsGroup(0)). - WithEnv( - corev1ac.EnvVar(). - WithName("POD_UID"). - WithValueFrom(corev1ac.EnvVarSource(). - WithFieldRef(corev1ac.ObjectFieldSelector(). - WithFieldPath("metadata.uid"))), - ). - WithVolumeMounts(corev1ac.VolumeMount(). - WithName("run-ateom"). - WithMountPath(ateompath.BasePath))). - WithSecurityContext(corev1ac.PodSecurityContext(). - WithRunAsUser(0). - WithRunAsGroup(0)). - WithVolumes(corev1ac.Volume(). - WithName("run-ateom"). - WithHostPath(corev1ac.HostPathVolumeSource(). - WithPath(ateompath.BasePath). - WithType(corev1.HostPathDirectoryOrCreate)))))) -} - // SetupWithManager sets up the controller with the Manager. func (r *WorkerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/cmd/atecontroller/internal/controllers/workerpool_controller_test.go b/cmd/atecontroller/internal/controllers/workerpool_controller_test.go index 3acf49248..f0435de7e 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_controller_test.go +++ b/cmd/atecontroller/internal/controllers/workerpool_controller_test.go @@ -24,6 +24,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" k8errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -341,6 +342,214 @@ func TestStatusReplicasPropagation(t *testing.T) { }) } +func sampleWorkerPoolPodTemplate() *atev1alpha1.WorkerPoolPodTemplate { + return &atev1alpha1.WorkerPoolPodTemplate{ + NodeSelector: map[string]string{ + "workload": "substrate", + }, + Tolerations: []corev1.Toleration{{ + Key: "nvidia.com/gpu", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }}, + PriorityClassName: "substrate-workers", + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{{ + MatchExpressions: []corev1.NodeSelectorRequirement{{ + Key: "workload", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"substrate"}, + }}, + }}, + }, + }, + } +} + +// TestWorkerPoolPodTemplatePropagation verifies that template fields propagate +// to the managed Deployment pod template. +func TestWorkerPoolPodTemplatePropagation(t *testing.T) { + wp := makeWorkerPool("test-template-propagate", "default", 1, "ateom:v1") + wp.Spec.Template = sampleWorkerPoolPodTemplate() + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil || len(dep.Spec.Template.Spec.Containers) == 0 { + return false, nil + } + podSpec := dep.Spec.Template.Spec + container := podSpec.Containers[0] + + if podSpec.NodeSelector["workload"] != "substrate" { + return false, nil + } + if len(podSpec.Tolerations) != 1 || podSpec.Tolerations[0].Key != "nvidia.com/gpu" { + return false, nil + } + if podSpec.PriorityClassName != "substrate-workers" { + return false, nil + } + if podSpec.Affinity == nil || podSpec.Affinity.NodeAffinity == nil { + return false, nil + } + return len(container.Resources.Requests) == 0 && len(container.Resources.Limits) == 0, nil + }) +} + +// TestWorkerPoolPodTemplateUpdate verifies that changing template fields on a +// WorkerPool propagates to the managed Deployment. +func TestWorkerPoolPodTemplateUpdate(t *testing.T) { + wp := makeWorkerPool("test-template-update", "default", 1, "ateom:v1") + wp.Spec.Template = sampleWorkerPoolPodTemplate() + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + return err == nil && dep.Spec.Template.Spec.NodeSelector["workload"] == "substrate", nil + }) + + if err := k8sClient.Get(testCtx, types.NamespacedName{Name: wp.Name, Namespace: wp.Namespace}, wp); err != nil { + t.Fatalf("re-fetch WorkerPool: %v", err) + } + wp.Spec.Template.NodeSelector = map[string]string{"workload": "updated"} + if err := k8sClient.Update(testCtx, wp); err != nil { + t.Fatalf("update WorkerPool template: %v", err) + } + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil || len(dep.Spec.Template.Spec.Containers) == 0 { + return false, nil + } + podSpec := dep.Spec.Template.Spec + return podSpec.NodeSelector["workload"] == "updated" && + len(podSpec.Containers[0].Resources.Requests) == 0, nil + }) +} + +// TestWorkerPoolPodTemplateClear verifies that clearing template.nodeSelector +// removes it from the managed Deployment. +func TestWorkerPoolPodTemplateClear(t *testing.T) { + wp := makeWorkerPool("test-template-clear", "default", 1, "ateom:v1") + wp.Spec.Template = sampleWorkerPoolPodTemplate() + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + return err == nil && dep.Spec.Template.Spec.NodeSelector["workload"] == "substrate", nil + }) + + if err := k8sClient.Get(testCtx, types.NamespacedName{Name: wp.Name, Namespace: wp.Namespace}, wp); err != nil { + t.Fatalf("re-fetch WorkerPool: %v", err) + } + wp.Spec.Template.NodeSelector = nil + if err := k8sClient.Update(testCtx, wp); err != nil { + t.Fatalf("clear WorkerPool nodeSelector: %v", err) + } + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil { + return false, nil + } + return len(dep.Spec.Template.Spec.NodeSelector) == 0, nil + }) +} + +// TestWorkerPoolPodTemplateClearAll verifies that removing spec.template clears +// all pod template fields owned by the workerpool-controller. +func TestWorkerPoolPodTemplateClearAll(t *testing.T) { + wp := makeWorkerPool("test-template-clear-all", "default", 1, "ateom:v1") + wp.Spec.Template = sampleWorkerPoolPodTemplate() + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil || len(dep.Spec.Template.Spec.Containers) == 0 { + return false, nil + } + podSpec := dep.Spec.Template.Spec + container := podSpec.Containers[0] + return podSpec.NodeSelector["workload"] == "substrate" && + len(podSpec.Tolerations) == 1 && + podSpec.PriorityClassName == "substrate-workers" && + podSpec.Affinity != nil && + podSpec.Affinity.NodeAffinity != nil && + len(container.Resources.Requests) == 0, nil + }) + + if err := k8sClient.Get(testCtx, types.NamespacedName{Name: wp.Name, Namespace: wp.Namespace}, wp); err != nil { + t.Fatalf("re-fetch WorkerPool: %v", err) + } + wp.Spec.Template = nil + if err := k8sClient.Update(testCtx, wp); err != nil { + t.Fatalf("clear WorkerPool template: %v", err) + } + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil || len(dep.Spec.Template.Spec.Containers) == 0 { + return false, nil + } + podSpec := dep.Spec.Template.Spec + container := podSpec.Containers[0] + return len(podSpec.NodeSelector) == 0 && + len(podSpec.Tolerations) == 0 && + podSpec.PriorityClassName == "" && + (podSpec.Affinity == nil || podSpec.Affinity.NodeAffinity == nil) && + len(container.Resources.Limits) == 0 && + len(container.Resources.Requests) == 0, nil + }) +} + +// TestSSARevertsOwnedPodTemplateFields verifies that if an external actor +// changes pod template fields owned by the workerpool-controller, the +// controller reverts them on the next reconcile. +func TestSSARevertsOwnedPodTemplateFields(t *testing.T) { + wp := makeWorkerPool("test-ssa-template", "default", 1, "ateom:v1") + wp.Spec.Template = sampleWorkerPoolPodTemplate() + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + return err == nil && dep.Spec.Template.Spec.NodeSelector["workload"] == "substrate", nil + }) + + dep, err := getDeployment(testCtx, wp) + if err != nil { + t.Fatalf("get Deployment: %v", err) + } + dep.Spec.Template.Spec.NodeSelector = map[string]string{"workload": "rogue"} + if err := k8sClient.Update(testCtx, dep); err != nil { + t.Fatalf("rogue update: %v", err) + } + + eventually(t, func(ctx context.Context) (bool, error) { + dep, err := getDeployment(ctx, wp) + if err != nil { + return false, nil + } + return dep.Spec.Template.Spec.NodeSelector["workload"] == "substrate", nil + }) +} + // TestReplicasValidationRejectsNegative verifies that the API server rejects a // WorkerPool whose spec.replicas is negative. func TestReplicasValidationRejectsNegative(t *testing.T) { diff --git a/docs/api-guide.md b/docs/api-guide.md index fe609e3ad..24b057dd7 100644 --- a/docs/api-guide.md +++ b/docs/api-guide.md @@ -12,6 +12,16 @@ The `WorkerPool` defines the pool of physical "warm" compute capacity. It manage | :--- | :--- | :--- | | `replicas` | `int32` | **Required.** Number of physical standby pods to maintain in the cluster. | | `ateomImage` | `string` | **Required.** The container image for the `ateom` herder process (e.g. `ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor`). | +| `template` | `WorkerPoolPodTemplate` | **Optional.** Pod scheduling and resource settings for worker pods. | + +#### `WorkerPoolPodTemplate` (`spec.template`) + +| Field | Type | Pod mapping | +| :--- | :--- | :--- | +| `nodeSelector` | `map[string]string` | `spec.nodeSelector` | +| `tolerations` | `[]Toleration` | `spec.tolerations` (max 16) | +| `priorityClassName` | `string` | `spec.priorityClassName` | +| `nodeAffinity` | `NodeAffinity` | `spec.affinity.nodeAffinity` | ### Example @@ -26,6 +36,34 @@ spec: ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor ``` +### Example with GPU node scheduling + +```yaml +apiVersion: ate.dev/v1alpha1 +kind: WorkerPool +metadata: + name: gpu-pool + namespace: ate-demo +spec: + replicas: 5 + ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor + template: + nodeSelector: + cloud.google.com/gke-accelerator: nvidia-tesla-t4 + tolerations: + - key: nvidia.com/gpu + operator: Exists + effect: NoSchedule + priorityClassName: substrate-workers + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: workload + operator: In + values: [substrate] +``` + --- ## 2. ActorTemplate: The Workload Blueprint diff --git a/docs/architecture.md b/docs/architecture.md index 883a3c7ed..84e4addc1 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -223,9 +223,10 @@ These resources define the intended state of the system and are managed via Kubernetes CRD APIs. They are used for administrative operations and actor environment definitions. - * **WorkerPool**: Defines a pool of "warm" compute capacity. It specifies the - hardware shape (CPU, memory, accelerators), manages a fleet of standby - worker pods initialized and ready to receive resumed actor states. + * **WorkerPool**: Defines a pool of "warm" compute capacity. It manages a + fleet of standby worker pods initialized and ready to receive resumed actor + states. Optional `spec.template` fields configure worker pod node + selection, tolerations, priority class, and node affinity. * **ActorTemplate**: An immutable definition of an actor-version. It encapsulates the container image, configuration, and environment required diff --git a/manifests/ate-install/generated/ate.dev_workerpools.yaml b/manifests/ate-install/generated/ate.dev_workerpools.yaml index 634512b23..65c19137d 100644 --- a/manifests/ate-install/generated/ate.dev_workerpools.yaml +++ b/manifests/ate-install/generated/ate.dev_workerpools.yaml @@ -75,6 +75,262 @@ spec: format: int32 minimum: 0 type: integer + template: + description: Template holds optional pod scheduling and resource settings + for worker pods. + properties: + nodeAffinity: + description: |- + NodeAffinity scheduling rules for the worker pods. Mapped to + spec.affinity.nodeAffinity on the pod. + properties: + preferredDuringSchedulingIgnoredDuringExecution: + description: |- + The scheduler will prefer to schedule pods to nodes that satisfy + the affinity expressions specified by this field, but it may choose + a node that violates one or more of the expressions. The node that is + most preferred is the one with the greatest sum of weights, i.e. + for each node that meets all of the scheduling requirements (resource + request, requiredDuringScheduling affinity expressions, etc.), + compute a sum by iterating through the elements of this field and adding + "weight" to the sum if the node matches the corresponding matchExpressions; the + node(s) with the highest sum are the most preferred. + items: + description: |- + An empty preferred scheduling term matches all objects with implicit weight 0 + (i.e. it's a no-op). A null preferred scheduling term matches no objects (i.e. is also a no-op). + properties: + preference: + description: A node selector term, associated with the + corresponding weight. + properties: + matchExpressions: + description: A list of node selector requirements + by node's labels. + items: + description: |- + A node selector requirement is a selector that contains values, a key, and an operator + that relates the key and values. + properties: + key: + description: The label key that the selector + applies to. + type: string + operator: + description: |- + Represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: |- + An array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator is Gt or Lt, the values + array must have a single element, which will be interpreted as an integer. + This array is replaced during a strategic merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + matchFields: + description: A list of node selector requirements + by node's fields. + items: + description: |- + A node selector requirement is a selector that contains values, a key, and an operator + that relates the key and values. + properties: + key: + description: The label key that the selector + applies to. + type: string + operator: + description: |- + Represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: |- + An array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator is Gt or Lt, the values + array must have a single element, which will be interpreted as an integer. + This array is replaced during a strategic merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + type: object + x-kubernetes-map-type: atomic + weight: + description: Weight associated with matching the corresponding + nodeSelectorTerm, in the range 1-100. + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + x-kubernetes-list-type: atomic + requiredDuringSchedulingIgnoredDuringExecution: + description: |- + If the affinity requirements specified by this field are not met at + scheduling time, the pod will not be scheduled onto the node. + If the affinity requirements specified by this field cease to be met + at some point during pod execution (e.g. due to an update), the system + may or may not try to eventually evict the pod from its node. + properties: + nodeSelectorTerms: + description: Required. A list of node selector terms. + The terms are ORed. + items: + description: |- + A null or empty node selector term matches no objects. The requirements of + them are ANDed. + The TopologySelectorTerm type implements a subset of the NodeSelectorTerm. + properties: + matchExpressions: + description: A list of node selector requirements + by node's labels. + items: + description: |- + A node selector requirement is a selector that contains values, a key, and an operator + that relates the key and values. + properties: + key: + description: The label key that the selector + applies to. + type: string + operator: + description: |- + Represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: |- + An array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator is Gt or Lt, the values + array must have a single element, which will be interpreted as an integer. + This array is replaced during a strategic merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + matchFields: + description: A list of node selector requirements + by node's fields. + items: + description: |- + A node selector requirement is a selector that contains values, a key, and an operator + that relates the key and values. + properties: + key: + description: The label key that the selector + applies to. + type: string + operator: + description: |- + Represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt. + type: string + values: + description: |- + An array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. If the operator is Gt or Lt, the values + array must have a single element, which will be interpreted as an integer. + This array is replaced during a strategic merge patch. + items: + type: string + type: array + x-kubernetes-list-type: atomic + required: + - key + - operator + type: object + type: array + x-kubernetes-list-type: atomic + type: object + x-kubernetes-map-type: atomic + type: array + x-kubernetes-list-type: atomic + required: + - nodeSelectorTerms + type: object + x-kubernetes-map-type: atomic + type: object + nodeSelector: + additionalProperties: + type: string + description: NodeSelector is a selector which must be true for + the pod to fit on a node. + type: object + priorityClassName: + description: PriorityClassName for the worker pods. + type: string + tolerations: + description: Tolerations for the worker pods. + items: + description: |- + The pod this Toleration is attached to tolerates any taint that matches + the triple using the matching operator . + properties: + effect: + description: |- + Effect indicates the taint effect to match. Empty means match all taint effects. + When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute. + type: string + key: + description: |- + Key is the taint key that the toleration applies to. Empty means match all taint keys. + If the key is empty, operator must be Exists; this combination means to match all values and all keys. + type: string + operator: + description: |- + Operator represents a key's relationship to the value. + Valid operators are Exists, Equal, Lt, and Gt. Defaults to Equal. + Exists is equivalent to wildcard for value, so that a pod can + tolerate all taints of a particular category. + Lt and Gt perform numeric comparisons (requires feature gate TaintTolerationComparisonOperators). + type: string + tolerationSeconds: + description: |- + TolerationSeconds represents the period of time the toleration (which must be + of effect NoExecute, otherwise this field is ignored) tolerates the taint. By default, + it is not set, which means tolerate the taint forever (do not evict). Zero and + negative values will be treated as 0 (evict immediately) by the system. + format: int64 + type: integer + value: + description: |- + Value is the taint value the toleration matches to. + If the operator is Exists, the value should be empty, otherwise just a regular string. + type: string + type: object + maxItems: 16 + type: array + x-kubernetes-list-type: atomic + type: object required: - ateomImage - replicas diff --git a/pkg/api/v1alpha1/workerpool_types.go b/pkg/api/v1alpha1/workerpool_types.go index 2350432b6..2a831389a 100644 --- a/pkg/api/v1alpha1/workerpool_types.go +++ b/pkg/api/v1alpha1/workerpool_types.go @@ -15,9 +15,37 @@ package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// WorkerPoolPodTemplate defines optional scheduling and resource settings for +// worker pods. NodeAffinity is mapped to spec.affinity.nodeAffinity on the pod. +type WorkerPoolPodTemplate struct { + // NodeSelector is a selector which must be true for the pod to fit on a node. + // + // +optional + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + + // Tolerations for the worker pods. + // + // +optional + // +kubebuilder:validation:MaxItems=16 + // +listType=atomic + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + + // PriorityClassName for the worker pods. + // + // +optional + PriorityClassName string `json:"priorityClassName,omitempty"` + + // NodeAffinity scheduling rules for the worker pods. Mapped to + // spec.affinity.nodeAffinity on the pod. + // + // +optional + NodeAffinity *corev1.NodeAffinity `json:"nodeAffinity,omitempty"` +} + type WorkerPoolSpec struct { // Replicas is the number of worker pods to run. // +required @@ -28,6 +56,11 @@ type WorkerPoolSpec struct { // +kubebuilder:validation:MinLength=1 // +required AteomImage string `json:"ateomImage"` + + // Template holds optional pod scheduling and resource settings for worker pods. + // + // +optional + Template *WorkerPoolPodTemplate `json:"template,omitempty"` } type WorkerPoolStatus struct { diff --git a/pkg/api/v1alpha1/workerpool_validation_test.go b/pkg/api/v1alpha1/workerpool_validation_test.go index bda091583..5c213bc1e 100644 --- a/pkg/api/v1alpha1/workerpool_validation_test.go +++ b/pkg/api/v1alpha1/workerpool_validation_test.go @@ -19,6 +19,7 @@ import ( "strings" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -59,6 +60,34 @@ func TestWorkerPoolValidation(t *testing.T) { }, wantErr: true, errMsg: "spec.ateomImage: Invalid value: \"\": spec.ateomImage in body should be at least 1 chars long", + }, { + name: "valid template", + mutate: func(wp *WorkerPool) { + wp.Spec.Template = &WorkerPoolPodTemplate{ + NodeSelector: map[string]string{"workload": "substrate"}, + Tolerations: []corev1.Toleration{{ + Key: "gpu", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }}, + } + }, + wantErr: false, + }, { + name: "too many tolerations", + mutate: func(wp *WorkerPool) { + tolerations := make([]corev1.Toleration, 17) + for i := range tolerations { + tolerations[i] = corev1.Toleration{ + Key: "key", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + } + } + wp.Spec.Template = &WorkerPoolPodTemplate{Tolerations: tolerations} + }, + wantErr: true, + errMsg: "spec.template.tolerations: Too many", }} for _, tt := range tests { diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 4bbfe02b6..9983ce2a5 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -19,6 +19,7 @@ package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -318,7 +319,7 @@ func (in *WorkerPool) DeepCopyInto(out *WorkerPool) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -372,9 +373,48 @@ func (in *WorkerPoolList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkerPoolPodTemplate) DeepCopyInto(out *WorkerPoolPodTemplate) { + *out = *in + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]corev1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.NodeAffinity != nil { + in, out := &in.NodeAffinity, &out.NodeAffinity + *out = new(corev1.NodeAffinity) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolPodTemplate. +func (in *WorkerPoolPodTemplate) DeepCopy() *WorkerPoolPodTemplate { + if in == nil { + return nil + } + out := new(WorkerPoolPodTemplate) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerPoolSpec) DeepCopyInto(out *WorkerPoolSpec) { *out = *in + if in.Template != nil { + in, out := &in.Template, &out.Template + *out = new(WorkerPoolPodTemplate) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolSpec.