From 03fb8ee5597ff65d85d1a05460de90c6cd4921bc Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Mon, 8 Jun 2026 21:19:29 +0300 Subject: [PATCH 1/7] feat(api): add WorkerPool autoscaling bounds to WorkerPoolSpec Add minReady, targetBuffer, and maxReplicas as optional declarative inputs for the WorkerPool autoscaler, plus a CEL rule rejecting minReady > maxReplicas. Regenerate deepcopy and CRD. Part of #198 (Phase 1). --- .../generated/ate.dev_workerpools.yaml | 35 ++++++++++++++++++- pkg/api/v1alpha1/workerpool_types.go | 28 ++++++++++++++- pkg/api/v1alpha1/zz_generated.deepcopy.go | 17 ++++++++- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/manifests/ate-install/generated/ate.dev_workerpools.yaml b/manifests/ate-install/generated/ate.dev_workerpools.yaml index 634512b23..ed4727f6f 100644 --- a/manifests/ate-install/generated/ate.dev_workerpools.yaml +++ b/manifests/ate-install/generated/ate.dev_workerpools.yaml @@ -70,8 +70,37 @@ spec: workers. minLength: 1 type: string + maxReplicas: + description: |- + MaxReplicas is the upper bound the autoscaler may grow the pool to. When + unset the autoscaler applies no ceiling of its own. + format: int32 + minimum: 0 + type: integer + minReady: + description: |- + MinReady is the minimum number of worker pods the autoscaler keeps the + pool at — the reservation floor it must never scale below. When unset the + pool may be scaled to zero. The floor is enforced by the autoscaler; the + WorkerPool controller never clamps Replicas itself, so that the scale + subresource keeps a single writer. + format: int32 + minimum: 0 + type: integer replicas: - description: Replicas is the number of worker pods to run. + description: |- + Replicas is the number of worker pods to run. When autoscaling is enabled + it is owned by the autoscaler (written via the scale subresource); the + fields below are the declarative inputs that drive it. + format: int32 + minimum: 0 + type: integer + targetBuffer: + description: |- + TargetBuffer is the desired number of idle (warm) workers the autoscaler + keeps available to absorb resume bursts. When the idle count falls below + this target the autoscaler provisions more workers, net of pods already + starting. When unset, buffer-based scale-up is disabled. format: int32 minimum: 0 type: integer @@ -79,6 +108,10 @@ spec: - ateomImage - replicas type: object + x-kubernetes-validations: + - message: minReady must not exceed maxReplicas + rule: '!has(self.minReady) || !has(self.maxReplicas) || self.minReady + <= self.maxReplicas' status: description: status is the observed state of WorkerPool properties: diff --git a/pkg/api/v1alpha1/workerpool_types.go b/pkg/api/v1alpha1/workerpool_types.go index 2350432b6..ec01428e2 100644 --- a/pkg/api/v1alpha1/workerpool_types.go +++ b/pkg/api/v1alpha1/workerpool_types.go @@ -18,8 +18,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// +kubebuilder:validation:XValidation:rule="!has(self.minReady) || !has(self.maxReplicas) || self.minReady <= self.maxReplicas",message="minReady must not exceed maxReplicas" type WorkerPoolSpec struct { - // Replicas is the number of worker pods to run. + // Replicas is the number of worker pods to run. When autoscaling is enabled + // it is owned by the autoscaler (written via the scale subresource); the + // fields below are the declarative inputs that drive it. // +required // +kubebuilder:validation:Minimum=0 Replicas int32 `json:"replicas"` @@ -28,6 +31,29 @@ type WorkerPoolSpec struct { // +kubebuilder:validation:MinLength=1 // +required AteomImage string `json:"ateomImage"` + + // MinReady is the minimum number of worker pods the autoscaler keeps the + // pool at — the reservation floor it must never scale below. When unset the + // pool may be scaled to zero. The floor is enforced by the autoscaler; the + // WorkerPool controller never clamps Replicas itself, so that the scale + // subresource keeps a single writer. + // +optional + // +kubebuilder:validation:Minimum=0 + MinReady *int32 `json:"minReady,omitempty"` + + // TargetBuffer is the desired number of idle (warm) workers the autoscaler + // keeps available to absorb resume bursts. When the idle count falls below + // this target the autoscaler provisions more workers, net of pods already + // starting. When unset, buffer-based scale-up is disabled. + // +optional + // +kubebuilder:validation:Minimum=0 + TargetBuffer *int32 `json:"targetBuffer,omitempty"` + + // MaxReplicas is the upper bound the autoscaler may grow the pool to. When + // unset the autoscaler applies no ceiling of its own. + // +optional + // +kubebuilder:validation:Minimum=0 + MaxReplicas *int32 `json:"maxReplicas,omitempty"` } type WorkerPoolStatus struct { diff --git a/pkg/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go index 4bbfe02b6..dc215517d 100644 --- a/pkg/api/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/v1alpha1/zz_generated.deepcopy.go @@ -318,7 +318,7 @@ func (in *WorkerPool) DeepCopyInto(out *WorkerPool) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -375,6 +375,21 @@ func (in *WorkerPoolList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerPoolSpec) DeepCopyInto(out *WorkerPoolSpec) { *out = *in + if in.MinReady != nil { + in, out := &in.MinReady, &out.MinReady + *out = new(int32) + **out = **in + } + if in.TargetBuffer != nil { + in, out := &in.TargetBuffer, &out.TargetBuffer + *out = new(int32) + **out = **in + } + if in.MaxReplicas != nil { + in, out := &in.MaxReplicas, &out.MaxReplicas + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerPoolSpec. From 5b2626c7823ee34103e90c13eecc29e9ebf05076 Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Mon, 8 Jun 2026 21:38:44 +0300 Subject: [PATCH 2/7] feat(autoscaler): add pure WorkerPool autoscaling decision logic Add internal/autoscaler with Step(): given a pool's bounds, a live occupancy observation, loop config and the clock, it returns the next desired replica count. Scale-up is immediate; scale-down is gated by a stabilization window. No Kubernetes/gRPC deps; covered by 16 unit tests. Part of #198 (Phase 2a). --- internal/autoscaler/policy.go | 149 +++++++++++++++++++ internal/autoscaler/policy_test.go | 220 +++++++++++++++++++++++++++++ 2 files changed, 369 insertions(+) create mode 100644 internal/autoscaler/policy.go create mode 100644 internal/autoscaler/policy_test.go diff --git a/internal/autoscaler/policy.go b/internal/autoscaler/policy.go new file mode 100644 index 000000000..5f0cf863a --- /dev/null +++ b/internal/autoscaler/policy.go @@ -0,0 +1,149 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package autoscaler holds the WorkerPool autoscaling control logic. This file +// is the pure decision core: given a pool's declarative bounds and a live +// observation it computes the next desired replica count. It has no Kubernetes +// or gRPC dependencies so it can be unit-tested exhaustively; the reconciler +// (see reconciler.go) supplies the observation and persists the small amount of +// timing state Step needs. +package autoscaler + +import "time" + +// Bounds are the declarative autoscaling inputs from WorkerPoolSpec. A nil +// pointer means the operator left the field unset. +type Bounds struct { + MinReady *int32 + TargetBuffer *int32 + MaxReplicas *int32 +} + +// Enabled reports whether autoscaling is configured for the pool. With neither +// minReady nor targetBuffer set, the autoscaler leaves the pool alone — replicas +// stay whatever a human (or some other tool) set on the scale subresource. +func (b Bounds) Enabled() bool { + return b.MinReady != nil || b.TargetBuffer != nil +} + +// Observation is the measured live state of a pool at decision time. +type Observation struct { + // Current is spec.replicas the autoscaler last set (the desired pod count). + Current int32 + // Free is the number of registered workers with no actor assigned (warm/idle). + Free int32 + // InFlight is pods requested but not yet registered as workers (still starting). + InFlight int32 +} + +// Config tunes the loop's deliberate up/down asymmetry. +type Config struct { + // ScaleDownStabilization is how long a shrink must be continuously wanted + // before it is applied. Scale-up ignores it. + ScaleDownStabilization time.Duration + // MaxScaleUpStep caps how many replicas a single up-step may add above the + // reservation floor. Zero means unlimited. The floor itself (MinReady) is + // always reached in one step regardless of this cap. + MaxScaleUpStep int32 +} + +// Decision is the loop's output for one tick. +type Decision struct { + // Target is the replica count to write. It equals Observation.Current when + // Changed is false. + Target int32 + Changed bool + Reason string +} + +// clamp constrains v to [MinReady, MaxReplicas] (and to a non-negative value). +// MinReady can only raise v; MaxReplicas can only lower it. Admission already +// guarantees MinReady <= MaxReplicas (CEL rule on WorkerPoolSpec). +func clamp(v int32, b Bounds) int32 { + if v < 0 { + v = 0 + } + if b.MinReady != nil && v < *b.MinReady { + v = *b.MinReady + } + if b.MaxReplicas != nil && v > *b.MaxReplicas { + v = *b.MaxReplicas + } + return v +} + +// ideal is the instantaneous target that keeps Free ≈ TargetBuffer: +// +// ideal = Current + TargetBuffer - (Free + InFlight) +// +// i.e. add the buffer deficit (when idle headroom is short) or subtract the +// surplus (when it is over-stocked), then clamp to [MinReady, MaxReplicas]. +// Netting against InFlight is the anti-windup term: pods already starting count +// toward the buffer, so the loop does not pile on scale-ups while they boot. +// +// With TargetBuffer unset there is no buffer goal, so the ideal is just Current +// clamped to the bounds — which still lets MinReady raise an under-floored pool +// and MaxReplicas cap an over-sized one. +func ideal(b Bounds, o Observation) int32 { + target := o.Current + if b.TargetBuffer != nil { + target = o.Current + *b.TargetBuffer - (o.Free + o.InFlight) + } + return clamp(target, b) +} + +// Step computes the next Decision. now is the current time; downWantedSince is +// the instant the pool first became eligible to scale down, or the zero Time if +// it is not currently eligible. Both are state the caller persists between +// ticks. Step returns the decision and the updated downWantedSince to carry +// into the next call. +// +// The asymmetry encodes the design's core constraint: +// - scale UP is latency-critical, so it is applied immediately (capped by +// MaxScaleUpStep for buffer-driven growth, but never throttled below the +// reservation floor); +// - scale DOWN is safety-critical, so it is applied only after the shrink has +// been wanted continuously for ScaleDownStabilization — any tick that no +// longer wants to shrink resets the timer; +// - the target is always within [MinReady, MaxReplicas]. +func Step(b Bounds, o Observation, c Config, now, downWantedSince time.Time) (Decision, time.Time) { + target := ideal(b, o) + + switch { + case target > o.Current: + // Scale up now. Cap buffer-driven growth, but re-clamp so the floor is + // still reached in a single step. + next := target + if c.MaxScaleUpStep > 0 && next-o.Current > c.MaxScaleUpStep { + next = clamp(o.Current+c.MaxScaleUpStep, b) + } + if next <= o.Current { + return Decision{Target: o.Current, Reason: "steady"}, time.Time{} + } + return Decision{Target: next, Changed: true, Reason: "scale up: refill buffer"}, time.Time{} + + case target < o.Current: + // Want to shrink: hold until the desire has persisted long enough. + if downWantedSince.IsZero() { + downWantedSince = now + } + if now.Sub(downWantedSince) >= c.ScaleDownStabilization { + return Decision{Target: target, Changed: true, Reason: "scale down: surplus buffer"}, time.Time{} + } + return Decision{Target: o.Current, Reason: "scale down pending stabilization"}, downWantedSince + + default: + return Decision{Target: o.Current, Reason: "steady"}, time.Time{} + } +} diff --git a/internal/autoscaler/policy_test.go b/internal/autoscaler/policy_test.go new file mode 100644 index 000000000..f186aa0d6 --- /dev/null +++ b/internal/autoscaler/policy_test.go @@ -0,0 +1,220 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package autoscaler + +import ( + "testing" + "time" +) + +func p(v int32) *int32 { return &v } + +var base = time.Unix(1_700_000_000, 0) + +func TestBoundsEnabled(t *testing.T) { + for _, tc := range []struct { + name string + b Bounds + want bool + }{ + {"none", Bounds{}, false}, + {"maxOnly", Bounds{MaxReplicas: p(5)}, false}, + {"minReady", Bounds{MinReady: p(1)}, true}, + {"targetBuffer", Bounds{TargetBuffer: p(2)}, true}, + } { + if got := tc.b.Enabled(); got != tc.want { + t.Errorf("%s: Enabled()=%v want %v", tc.name, got, tc.want) + } + } +} + +func TestStep(t *testing.T) { + const stab = 30 * time.Second + for _, tc := range []struct { + name string + b Bounds + o Observation + c Config + now time.Time + downSince time.Time + wantTarget int32 + wantChanged bool + wantDown time.Time // zero unless a timer should be carried + }{ + { + name: "buffer deficit scales up immediately", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 0}, + now: base, + wantTarget: 8, wantChanged: true, + }, + { + name: "inflight counts toward buffer (anti-windup)", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 0, InFlight: 3}, + now: base, + wantTarget: 5, wantChanged: false, + }, + { + name: "partial deficit", + b: Bounds{TargetBuffer: p(3)}, + o: Observation{Current: 5, Free: 1}, + now: base, + wantTarget: 7, wantChanged: true, + }, + { + name: "scale up capped by MaxScaleUpStep", + b: Bounds{TargetBuffer: p(10)}, + o: Observation{Current: 5}, + c: Config{MaxScaleUpStep: 2}, + now: base, + wantTarget: 7, wantChanged: true, + }, + { + name: "reservation floor reached in one step despite small cap", + b: Bounds{MinReady: p(10)}, + o: Observation{Current: 0}, + c: Config{MaxScaleUpStep: 2}, + now: base, + wantTarget: 10, wantChanged: true, + }, + { + name: "maxReplicas caps scale up", + b: Bounds{TargetBuffer: p(100), MaxReplicas: p(8)}, + o: Observation{Current: 5}, + now: base, + wantTarget: 8, wantChanged: true, + }, + { + name: "surplus buffer: shrink held, timer starts", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base, + wantTarget: 10, wantChanged: false, wantDown: base, + }, + { + name: "shrink applied once stabilization elapsed", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 4, wantChanged: true, + }, + { + name: "shrink still pending within window", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 10, Free: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(10 * time.Second), + downSince: base, + wantTarget: 10, wantChanged: false, wantDown: base, + }, + { + name: "no targetBuffer: minReady raises under-floored pool", + b: Bounds{MinReady: p(3)}, + o: Observation{Current: 1}, + now: base, + wantTarget: 3, wantChanged: true, + }, + { + name: "no targetBuffer: maxReplicas trims oversized pool after window", + b: Bounds{MaxReplicas: p(5)}, + o: Observation{Current: 8}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 5, wantChanged: true, + }, + { + name: "no autoscaling fields: steady no-op", + b: Bounds{}, + o: Observation{Current: 4}, + now: base, + wantTarget: 4, wantChanged: false, + }, + { + name: "reaching steady clears a pending shrink timer", + b: Bounds{TargetBuffer: p(2)}, + o: Observation{Current: 4, Free: 2}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(5 * time.Second), + downSince: base, // a shrink was pending... + wantTarget: 4, wantChanged: false, // ...but ideal now equals current, so timer resets + }, + { + name: "minReady=0 allows shrink to zero after window", + b: Bounds{MinReady: p(0), TargetBuffer: p(0)}, + o: Observation{Current: 3, Free: 3}, + c: Config{ScaleDownStabilization: stab}, + now: base.Add(stab), + downSince: base, + wantTarget: 0, wantChanged: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + got, gotDown := Step(tc.b, tc.o, tc.c, tc.now, tc.downSince) + if got.Target != tc.wantTarget || got.Changed != tc.wantChanged { + t.Errorf("Step => {Target:%d Changed:%v Reason:%q}, want {Target:%d Changed:%v}", + got.Target, got.Changed, got.Reason, tc.wantTarget, tc.wantChanged) + } + if !gotDown.Equal(tc.wantDown) { + t.Errorf("downWantedSince => %v, want %v", gotDown, tc.wantDown) + } + }) + } +} + +// TestStepScaleDownLifecycle walks the hysteresis state machine across ticks. +func TestStepScaleDownLifecycle(t *testing.T) { + b := Bounds{TargetBuffer: p(2)} + c := Config{ScaleDownStabilization: 30 * time.Second} + surplus := Observation{Current: 10, Free: 8} // ideal 4, wants shrink + + // Tick 1: shrink wanted, timer starts, held. + d, down := Step(b, surplus, c, base, time.Time{}) + if d.Changed || !down.Equal(base) { + t.Fatalf("tick1: got changed=%v down=%v, want held with timer=%v", d.Changed, down, base) + } + // Tick 2: still within window, still held, timer carried. + d, down = Step(b, surplus, c, base.Add(10*time.Second), down) + if d.Changed || !down.Equal(base) { + t.Fatalf("tick2: got changed=%v down=%v, want held", d.Changed, down) + } + // Tick 3: window elapsed, shrink applied, timer cleared. + d, down = Step(b, surplus, c, base.Add(31*time.Second), down) + if !d.Changed || d.Target != 4 || !down.IsZero() { + t.Fatalf("tick3: got {changed:%v target:%d} down=%v, want shrink to 4 + cleared timer", d.Changed, d.Target, down) + } +} + +// TestStepScaleDownTimerResetsOnDemand verifies a returning burst cancels a +// pending shrink so a brief lull never throws away warm capacity. +func TestStepScaleDownTimerResetsOnDemand(t *testing.T) { + b := Bounds{TargetBuffer: p(2)} + c := Config{ScaleDownStabilization: 30 * time.Second} + + // Shrink wanted: timer starts. + _, down := Step(b, Observation{Current: 10, Free: 8}, c, base, time.Time{}) + if !down.Equal(base) { + t.Fatalf("expected timer to start at %v, got %v", base, down) + } + // Demand returns (free drops below buffer): scale up now, timer cleared. + d, down := Step(b, Observation{Current: 10, Free: 0}, c, base.Add(5*time.Second), down) + if !d.Changed || d.Target != 12 || !down.IsZero() { + t.Fatalf("burst: got {changed:%v target:%d} down=%v, want scale up to 12 + cleared timer", d.Changed, d.Target, down) + } +} From ee0e2d1d291166674c3370a6df7d9266a1b2077b Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Mon, 8 Jun 2026 23:49:46 +0300 Subject: [PATCH 3/7] feat(autoscaler): add WorkerPool autoscaler reconciler Add WorkerPoolAutoscaler in internal/controllers: the single writer of spec.replicas for autoscaled pools. It reads each pool's bounds, measures occupancy via ateapi ListWorkers, runs the decision policy, and patches replicas. WorkerPoolReconciler still owns the Deployment and status, so the two controllers write disjoint fields. Re-evaluation is poll-driven (RequeueAfter) as the down-path and safety net; the reactive up-path is added separately. Wired into atecontroller, reusing its ateapi client. Part of #198 (Phase 2b). --- cmd/atecontroller/main.go | 16 ++ internal/controllers/workerpool_autoscaler.go | 207 ++++++++++++++++++ .../controllers/workerpool_autoscaler_test.go | 156 +++++++++++++ 3 files changed, 379 insertions(+) create mode 100644 internal/controllers/workerpool_autoscaler.go create mode 100644 internal/controllers/workerpool_autoscaler_test.go diff --git a/cmd/atecontroller/main.go b/cmd/atecontroller/main.go index f7e922273..746cd7537 100644 --- a/cmd/atecontroller/main.go +++ b/cmd/atecontroller/main.go @@ -16,7 +16,9 @@ package main import ( "crypto/tls" "os" + "time" + "github.com/agent-substrate/substrate/internal/autoscaler" "github.com/agent-substrate/substrate/internal/controllers" clientv1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -88,6 +90,20 @@ func main() { os.Exit(1) } + if err = (&controllers.WorkerPoolAutoscaler{ + Client: mgr.GetClient(), + AteClient: ateapiClient, + Config: autoscaler.Config{ + // TODO: surface these as flags once we tune them in a cluster. + ScaleDownStabilization: 60 * time.Second, + MaxScaleUpStep: 0, // unlimited + }, + Interval: 10 * time.Second, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "WorkerPoolAutoscaler") + os.Exit(1) + } + //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/internal/controllers/workerpool_autoscaler.go b/internal/controllers/workerpool_autoscaler.go new file mode 100644 index 000000000..d07311e7a --- /dev/null +++ b/internal/controllers/workerpool_autoscaler.go @@ -0,0 +1,207 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "fmt" + "sync" + "time" + + k8errors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/agent-substrate/substrate/internal/autoscaler" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +// defaultAutoscaleInterval is how often each autoscaled pool is re-evaluated +// when nothing else triggers a reconcile. Occupancy lives in ateapi rather than +// in Kubernetes, so the loop polls on this cadence instead of waking on events. +const defaultAutoscaleInterval = 10 * time.Second + +// WorkerPoolAutoscaler is the single writer of spec.replicas for autoscaled +// WorkerPools. Each tick it reads a pool's declarative bounds, measures live +// occupancy from ateapi, runs the decision policy (internal/autoscaler), and +// patches the replica count. It deliberately does not touch the Deployment — +// WorkerPoolReconciler still materializes spec.replicas — so the two +// controllers own disjoint fields and never fight. +type WorkerPoolAutoscaler struct { + client.Client + + // AteClient is the control-plane API used to read per-pool worker occupancy. + AteClient ateapipb.ControlClient + // Config tunes the decision policy (stabilization window, max up-step). + Config autoscaler.Config + // Interval is the re-evaluation cadence. Defaults to defaultAutoscaleInterval. + Interval time.Duration + + // now is the clock, overridable in tests. + now func() time.Time + // downSince remembers, per pool, when a scale-down first became eligible, so + // the stabilization window survives across reconciles. Lost on restart, which + // is safe: it merely restarts the (conservative) down timer. + mu sync.Mutex + downSince map[types.NamespacedName]time.Time +} + +//+kubebuilder:rbac:groups=ate.dev,resources=workerpools,verbs=get;list;watch;update;patch + +// Reconcile evaluates one WorkerPool and, if autoscaling is configured, moves +// spec.replicas toward the policy's target. +func (r *WorkerPoolAutoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx) + + wp := &atev1alpha1.WorkerPool{} + if err := r.Get(ctx, req.NamespacedName, wp); err != nil { + if k8errors.IsNotFound(err) { + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get worker pool %q: %w", req.NamespacedName, err) + } + + if !wp.GetDeletionTimestamp().IsZero() { + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + + bounds := autoscaler.Bounds{ + MinReady: wp.Spec.MinReady, + TargetBuffer: wp.Spec.TargetBuffer, + MaxReplicas: wp.Spec.MaxReplicas, + } + if !bounds.Enabled() { + // Pool is not autoscaled: leave spec.replicas to whoever owns it and stop + // requeuing it. + r.forget(req.NamespacedName) + return ctrl.Result{}, nil + } + + obs, err := r.observe(ctx, wp) + if err != nil { + return ctrl.Result{}, fmt.Errorf("while observing pool occupancy: %w", err) + } + + decision, downSince := autoscaler.Step(bounds, obs, r.Config, r.nowFn(), r.loadDownSince(req.NamespacedName)) + r.storeDownSince(req.NamespacedName, downSince) + + if decision.Changed { + if err := r.scaleTo(ctx, wp, decision.Target); err != nil { + return ctrl.Result{}, fmt.Errorf("while scaling worker pool: %w", err) + } + log.Info("autoscaled WorkerPool", + "from", obs.Current, "to", decision.Target, + "free", obs.Free, "inFlight", obs.InFlight, "reason", decision.Reason) + } + + return ctrl.Result{RequeueAfter: r.interval()}, nil +} + +// observe measures the pool's live occupancy from ateapi. Free is the number of +// registered workers with no actor; InFlight is pods requested but not yet +// registered (current desired count minus everything that has registered). +func (r *WorkerPoolAutoscaler) observe(ctx context.Context, wp *atev1alpha1.WorkerPool) (autoscaler.Observation, error) { + resp, err := r.AteClient.ListWorkers(ctx, &ateapipb.ListWorkersRequest{}) + if err != nil { + return autoscaler.Observation{}, fmt.Errorf("listing workers: %w", err) + } + + var registered, free int32 + for _, w := range resp.GetWorkers() { + if w.GetWorkerNamespace() != wp.Namespace || w.GetWorkerPool() != wp.Name { + continue + } + registered++ + if w.GetActorId() == "" { + free++ + } + } + + inFlight := wp.Spec.Replicas - registered + if inFlight < 0 { + inFlight = 0 + } + return autoscaler.Observation{Current: wp.Spec.Replicas, Free: free, InFlight: inFlight}, nil +} + +// scaleTo patches only spec.replicas — the field the scale subresource maps to — +// leaving every other field to its owner. +func (r *WorkerPoolAutoscaler) scaleTo(ctx context.Context, wp *atev1alpha1.WorkerPool, target int32) error { + patch := client.MergeFrom(wp.DeepCopy()) + wp.Spec.Replicas = target + return r.Patch(ctx, wp, patch) +} + +func (r *WorkerPoolAutoscaler) nowFn() time.Time { + if r.now != nil { + return r.now() + } + return time.Now() +} + +func (r *WorkerPoolAutoscaler) interval() time.Duration { + if r.Interval > 0 { + return r.Interval + } + return defaultAutoscaleInterval +} + +func (r *WorkerPoolAutoscaler) loadDownSince(key types.NamespacedName) time.Time { + r.mu.Lock() + defer r.mu.Unlock() + return r.downSince[key] +} + +func (r *WorkerPoolAutoscaler) storeDownSince(key types.NamespacedName, t time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + if r.downSince == nil { + r.downSince = map[types.NamespacedName]time.Time{} + } + if t.IsZero() { + delete(r.downSince, key) + return + } + r.downSince[key] = t +} + +func (r *WorkerPoolAutoscaler) forget(key types.NamespacedName) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.downSince, key) +} + +// SetupWithManager registers the autoscaler. It uses a distinct controller name +// (WorkerPoolReconciler also watches WorkerPool) and a generation predicate so +// status-only writes don't wake it — periodic requeue drives the polling. +func (r *WorkerPoolAutoscaler) SetupWithManager(mgr ctrl.Manager) error { + r.mu.Lock() + if r.downSince == nil { + r.downSince = map[types.NamespacedName]time.Time{} + } + r.mu.Unlock() + + return ctrl.NewControllerManagedBy(mgr). + For(&atev1alpha1.WorkerPool{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Named("workerpool-autoscaler"). + Complete(r) +} diff --git a/internal/controllers/workerpool_autoscaler_test.go b/internal/controllers/workerpool_autoscaler_test.go new file mode 100644 index 000000000..f5bc761bf --- /dev/null +++ b/internal/controllers/workerpool_autoscaler_test.go @@ -0,0 +1,156 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/agent-substrate/substrate/internal/autoscaler" + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +func ptrInt32(v int32) *int32 { return &v } + +// stubControl is a ControlClient whose ListWorkers returns a fixed worker set. +// The autoscaler only calls ListWorkers; any other method would panic via the +// nil embedded interface, which keeps the stub honest about what it relies on. +type stubControl struct { + ateapipb.ControlClient + workers []*ateapipb.Worker +} + +func (s *stubControl) ListWorkers(context.Context, *ateapipb.ListWorkersRequest, ...grpc.CallOption) (*ateapipb.ListWorkersResponse, error) { + return &ateapipb.ListWorkersResponse{Workers: s.workers}, nil +} + +// poolWorkers builds `total` workers for a pool, the first `occupied` of which +// carry an actor (the rest are free/idle). +func poolWorkers(ns, pool string, total, occupied int) []*ateapipb.Worker { + ws := make([]*ateapipb.Worker, 0, total) + for i := 0; i < total; i++ { + actor := "" + if i < occupied { + actor = "actor" + } + ws = append(ws, &ateapipb.Worker{WorkerNamespace: ns, WorkerPool: pool, ActorId: actor}) + } + return ws +} + +func TestAutoscalerScalesUpToRefillBuffer(t *testing.T) { + wp := makeWorkerPool("autoscale-up", "default", 5, "ateom:v1") + wp.Spec.TargetBuffer = ptrInt32(3) + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + // 5 registered, 1 free, 0 in-flight => ideal = 5 + 3 - 1 = 7. + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{workers: poolWorkers("default", "autoscale-up", 5, 4)}, + now: func() time.Time { return time.Unix(1_700_000_000, 0) }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-up"} + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile: %v", err) + } + + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 7 { + t.Fatalf("replicas = %d, want 7", got.Spec.Replicas) + } +} + +func TestAutoscalerSkipsUnconfiguredPool(t *testing.T) { + wp := makeWorkerPool("autoscale-skip", "default", 4, "ateom:v1") // no autoscaling fields + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{}, // ListWorkers must not be reached + now: func() time.Time { return time.Unix(1_700_000_000, 0) }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-skip"} + res, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}) + if err != nil { + t.Fatalf("reconcile: %v", err) + } + if res.RequeueAfter != 0 { + t.Fatalf("unconfigured pool should not requeue, got %v", res.RequeueAfter) + } + + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 4 { + t.Fatalf("replicas = %d, want 4 (unchanged)", got.Spec.Replicas) + } +} + +func TestAutoscalerScaleDownAfterStabilization(t *testing.T) { + wp := makeWorkerPool("autoscale-down", "default", 10, "ateom:v1") + wp.Spec.TargetBuffer = ptrInt32(2) + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + + // 10 registered, 8 free => ideal = 10 + 2 - 8 = 4 (wants to shrink). + base := time.Unix(1_700_000_000, 0) + now := base + r := &WorkerPoolAutoscaler{ + Client: k8sClient, + AteClient: &stubControl{workers: poolWorkers("default", "autoscale-down", 10, 2)}, + Config: autoscaler.Config{ScaleDownStabilization: 30 * time.Second}, + now: func() time.Time { return now }, + } + key := types.NamespacedName{Namespace: "default", Name: "autoscale-down"} + + // Tick 1: shrink wanted but within the window => held at 10. + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile tick1: %v", err) + } + got := &atev1alpha1.WorkerPool{} + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 10 { + t.Fatalf("after tick1 replicas = %d, want 10 (held)", got.Spec.Replicas) + } + + // Tick 2: window elapsed => shrink applied to 4. + now = base.Add(31 * time.Second) + if _, err := r.Reconcile(testCtx, ctrl.Request{NamespacedName: key}); err != nil { + t.Fatalf("reconcile tick2: %v", err) + } + if err := k8sClient.Get(testCtx, key, got); err != nil { + t.Fatal(err) + } + if got.Spec.Replicas != 4 { + t.Fatalf("after tick2 replicas = %d, want 4 (applied)", got.Spec.Replicas) + } +} From 72e96a46546640ad3445ad9e86e9afe925d230fe Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Mon, 8 Jun 2026 23:49:46 +0300 Subject: [PATCH 4/7] feat(ateapi): emit capacity-pressure signal on resume miss Add a WatchCapacityPressure server-streaming RPC and an in-process pub/sub hub. When AssignWorkerStep finds no free worker for a pool it publishes a pool-scoped CapacityPressureEvent at the request edge; subscribers stream them. Publish is non-blocking (lossy on a full subscriber buffer) so it never slows the resume path. Producer side of the reactive scale-up path; the autoscaler subscribes separately. Part of #198 (Phase 3a). --- .../internal/controlapi/capacity_pressure.go | 79 +++++++ .../controlapi/capacity_pressure_test.go | 75 +++++++ cmd/ateapi/internal/controlapi/service.go | 7 +- .../controlapi/watch_capacity_pressure.go | 43 ++++ cmd/ateapi/internal/controlapi/workflow.go | 6 +- .../internal/controlapi/workflow_resume.go | 8 +- pkg/proto/ateapipb/ateapi.pb.go | 208 +++++++++++++----- pkg/proto/ateapipb/ateapi.proto | 14 ++ pkg/proto/ateapipb/ateapi_grpc.pb.go | 66 +++++- 9 files changed, 439 insertions(+), 67 deletions(-) create mode 100644 cmd/ateapi/internal/controlapi/capacity_pressure.go create mode 100644 cmd/ateapi/internal/controlapi/capacity_pressure_test.go create mode 100644 cmd/ateapi/internal/controlapi/watch_capacity_pressure.go diff --git a/cmd/ateapi/internal/controlapi/capacity_pressure.go b/cmd/ateapi/internal/controlapi/capacity_pressure.go new file mode 100644 index 000000000..b6ebd773e --- /dev/null +++ b/cmd/ateapi/internal/controlapi/capacity_pressure.go @@ -0,0 +1,79 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import "sync" + +// poolKey identifies a worker pool by namespace and name. +type poolKey struct { + namespace string + name string +} + +// CapacityPressureHub fans out capacity-pressure notifications — a pool had no +// free worker for a resume — to any number of subscribers (the WatchCapacity- +// Pressure RPC handlers). Publish is called on the resume hot path and must +// never block: a subscriber whose buffer is full simply misses the event, and +// the autoscaler's periodic reconcile is the backstop. +type CapacityPressureHub struct { + mu sync.Mutex + nextID int + subs map[int]chan poolKey +} + +// NewCapacityPressureHub returns an empty hub. +func NewCapacityPressureHub() *CapacityPressureHub { + return &CapacityPressureHub{subs: make(map[int]chan poolKey)} +} + +// Subscribe registers a subscriber and returns its event channel plus a cancel +// func that unregisters and closes the channel. cancel is idempotent. The +// channel is buffered so short bursts aren't dropped, and lossy beyond that by +// design. +func (h *CapacityPressureHub) Subscribe() (<-chan poolKey, func()) { + ch := make(chan poolKey, 64) + + h.mu.Lock() + id := h.nextID + h.nextID++ + h.subs[id] = ch + h.mu.Unlock() + + var once sync.Once + cancel := func() { + once.Do(func() { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.subs, id) + close(ch) + }) + } + return ch, cancel +} + +// Publish notifies every subscriber that the named pool had no free worker. It +// never blocks: a full subscriber buffer drops the event. +func (h *CapacityPressureHub) Publish(namespace, name string) { + key := poolKey{namespace: namespace, name: name} + + h.mu.Lock() + defer h.mu.Unlock() + for _, ch := range h.subs { + select { + case ch <- key: + default: + } + } +} diff --git a/cmd/ateapi/internal/controlapi/capacity_pressure_test.go b/cmd/ateapi/internal/controlapi/capacity_pressure_test.go new file mode 100644 index 000000000..46e210240 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/capacity_pressure_test.go @@ -0,0 +1,75 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "testing" + "time" +) + +func TestCapacityPressureHubFanOut(t *testing.T) { + h := NewCapacityPressureHub() + a, cancelA := h.Subscribe() + defer cancelA() + b, cancelB := h.Subscribe() + defer cancelB() + + h.Publish("ns", "pool") + + for i, ch := range []<-chan poolKey{a, b} { + select { + case got := <-ch: + if got.namespace != "ns" || got.name != "pool" { + t.Fatalf("subscriber %d: got %+v, want {ns pool}", i, got) + } + case <-time.After(time.Second): + t.Fatalf("subscriber %d: no event delivered", i) + } + } +} + +func TestCapacityPressureHubUnsubscribe(t *testing.T) { + h := NewCapacityPressureHub() + ch, cancel := h.Subscribe() + + cancel() + // Publishing after unsubscribe must not panic, and the channel is closed. + h.Publish("ns", "pool") + if _, ok := <-ch; ok { + t.Fatal("channel should be closed after cancel") + } + // cancel is idempotent. + cancel() +} + +func TestCapacityPressureHubPublishNeverBlocks(t *testing.T) { + h := NewCapacityPressureHub() + _, cancel := h.Subscribe() // a subscriber that never drains + defer cancel() + + done := make(chan struct{}) + go func() { + for i := 0; i < 10_000; i++ { + h.Publish("ns", "pool") // must drop, not block, once the buffer fills + } + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Publish blocked on a full subscriber buffer") + } +} diff --git a/cmd/ateapi/internal/controlapi/service.go b/cmd/ateapi/internal/controlapi/service.go index 39841729e..f60775495 100644 --- a/cmd/ateapi/internal/controlapi/service.go +++ b/cmd/ateapi/internal/controlapi/service.go @@ -28,17 +28,22 @@ type Service struct { dialer *AteletDialer actorTemplateLister listersv1alpha1.ActorTemplateLister actorWorkflow *ActorWorkflow + pressure *CapacityPressureHub } var _ ateapipb.ControlServer = (*Service)(nil) // NewService creates a service. func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service { + // The hub is shared: the resume workflow publishes capacity-pressure events + // to it, and the WatchCapacityPressure RPC streams them to subscribers. + pressure := NewCapacityPressureHub() s := &Service{ persistence: persistence, actorTemplateLister: actorTemplateLister, dialer: dialer, - actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient), + actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient, pressure), + pressure: pressure, } return s diff --git a/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go b/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go new file mode 100644 index 000000000..eb922a2d2 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/watch_capacity_pressure.go @@ -0,0 +1,43 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "google.golang.org/grpc" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +// WatchCapacityPressure streams a CapacityPressureEvent every time a pool has +// no free worker for a resume, until the client disconnects. +func (s *Service) WatchCapacityPressure(_ *ateapipb.WatchCapacityPressureRequest, stream grpc.ServerStreamingServer[ateapipb.CapacityPressureEvent]) error { + events, cancel := s.pressure.Subscribe() + defer cancel() + + ctx := stream.Context() + for { + select { + case <-ctx.Done(): + return nil + case key := <-events: + if err := stream.Send(&ateapipb.CapacityPressureEvent{ + WorkerNamespace: key.namespace, + WorkerPool: key.name, + }); err != nil { + return err + } + } + } +} diff --git a/cmd/ateapi/internal/controlapi/workflow.go b/cmd/ateapi/internal/controlapi/workflow.go index 37abb258a..f92af7684 100644 --- a/cmd/ateapi/internal/controlapi/workflow.go +++ b/cmd/ateapi/internal/controlapi/workflow.go @@ -119,16 +119,18 @@ type ActorWorkflow struct { actorTemplateLister listersv1alpha1.ActorTemplateLister kubeClient kubernetes.Interface secretCache *envSecretCache + pressure *CapacityPressureHub } // NewActorWorkflow creates a new ActorWorkflow. -func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow { +func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface, pressure *CapacityPressureHub) *ActorWorkflow { return &ActorWorkflow{ store: store, dialer: dialer, actorTemplateLister: actorTemplateLister, kubeClient: kubeClient, secretCache: newEnvSecretCache(envSecretCacheTTL), + pressure: pressure, } } @@ -150,7 +152,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) ( steps := []WorkflowStep[*ResumeInput, *ResumeState]{ &LoadActorForResumeStep{store: w.store, actorTemplateLister: w.actorTemplateLister}, - &AssignWorkerStep{store: w.store}, + &AssignWorkerStep{store: w.store, pressure: w.pressure}, &CallAteletRestoreStep{dialer: w.dialer, kubeClient: w.kubeClient, secretCache: w.secretCache}, &FinalizeRunningStep{store: w.store}, } diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index 9f06eec0d..bad377d13 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -77,7 +77,8 @@ func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput func (s *LoadActorForResumeStep) RetryBackoff() *wait.Backoff { return nil } type AssignWorkerStep struct { - store store.Interface + store store.Interface + pressure *CapacityPressureHub } func (s *AssignWorkerStep) Name() string { return "AssignWorker" } @@ -105,6 +106,11 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat if assignedWorker == nil { pickedWorker := s.findFreeWorker(workers, state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name) if pickedWorker == nil { + // Signal capacity pressure for this pool so the autoscaler can react + // at the request edge instead of waiting for its next poll. + if s.pressure != nil { + s.pressure.Publish(state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name) + } return status.Errorf(codes.FailedPrecondition, "no free workers available") } diff --git a/pkg/proto/ateapipb/ateapi.pb.go b/pkg/proto/ateapipb/ateapi.pb.go index a11f341fa..a3a992875 100644 --- a/pkg/proto/ateapipb/ateapi.pb.go +++ b/pkg/proto/ateapipb/ateapi.pb.go @@ -1052,6 +1052,96 @@ func (*DebugClearResponse) Descriptor() ([]byte, []int) { return file_ateapi_proto_rawDescGZIP(), []int{17} } +type WatchCapacityPressureRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WatchCapacityPressureRequest) Reset() { + *x = WatchCapacityPressureRequest{} + mi := &file_ateapi_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WatchCapacityPressureRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchCapacityPressureRequest) ProtoMessage() {} + +func (x *WatchCapacityPressureRequest) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchCapacityPressureRequest.ProtoReflect.Descriptor instead. +func (*WatchCapacityPressureRequest) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{18} +} + +// CapacityPressureEvent signals that a worker pool had no free worker for a +// resume, identifying the pool so a consumer can react per-pool. +type CapacityPressureEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + WorkerNamespace string `protobuf:"bytes,1,opt,name=worker_namespace,json=workerNamespace,proto3" json:"worker_namespace,omitempty"` + WorkerPool string `protobuf:"bytes,2,opt,name=worker_pool,json=workerPool,proto3" json:"worker_pool,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CapacityPressureEvent) Reset() { + *x = CapacityPressureEvent{} + mi := &file_ateapi_proto_msgTypes[19] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CapacityPressureEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CapacityPressureEvent) ProtoMessage() {} + +func (x *CapacityPressureEvent) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[19] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CapacityPressureEvent.ProtoReflect.Descriptor instead. +func (*CapacityPressureEvent) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{19} +} + +func (x *CapacityPressureEvent) GetWorkerNamespace() string { + if x != nil { + return x.WorkerNamespace + } + return "" +} + +func (x *CapacityPressureEvent) GetWorkerPool() string { + if x != nil { + return x.WorkerPool + } + return "" +} + type MintJWTRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Audience []string `protobuf:"bytes,1,rep,name=audience,proto3" json:"audience,omitempty"` @@ -1064,7 +1154,7 @@ type MintJWTRequest struct { func (x *MintJWTRequest) Reset() { *x = MintJWTRequest{} - mi := &file_ateapi_proto_msgTypes[18] + mi := &file_ateapi_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1076,7 +1166,7 @@ func (x *MintJWTRequest) String() string { func (*MintJWTRequest) ProtoMessage() {} func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[18] + mi := &file_ateapi_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1089,7 +1179,7 @@ func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTRequest.ProtoReflect.Descriptor instead. func (*MintJWTRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{18} + return file_ateapi_proto_rawDescGZIP(), []int{20} } func (x *MintJWTRequest) GetAudience() []string { @@ -1149,7 +1239,7 @@ type MintJWTResponse struct { func (x *MintJWTResponse) Reset() { *x = MintJWTResponse{} - mi := &file_ateapi_proto_msgTypes[19] + mi := &file_ateapi_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1161,7 +1251,7 @@ func (x *MintJWTResponse) String() string { func (*MintJWTResponse) ProtoMessage() {} func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[19] + mi := &file_ateapi_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1174,7 +1264,7 @@ func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTResponse.ProtoReflect.Descriptor instead. func (*MintJWTResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{19} + return file_ateapi_proto_rawDescGZIP(), []int{21} } func (x *MintJWTResponse) GetSessionJwt() string { @@ -1199,7 +1289,7 @@ type MintCertRequest struct { func (x *MintCertRequest) Reset() { *x = MintCertRequest{} - mi := &file_ateapi_proto_msgTypes[20] + mi := &file_ateapi_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1211,7 +1301,7 @@ func (x *MintCertRequest) String() string { func (*MintCertRequest) ProtoMessage() {} func (x *MintCertRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[20] + mi := &file_ateapi_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1224,7 +1314,7 @@ func (x *MintCertRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertRequest.ProtoReflect.Descriptor instead. func (*MintCertRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{20} + return file_ateapi_proto_rawDescGZIP(), []int{22} } func (x *MintCertRequest) GetAppId() string { @@ -1267,7 +1357,7 @@ type MintCertResponse struct { func (x *MintCertResponse) Reset() { *x = MintCertResponse{} - mi := &file_ateapi_proto_msgTypes[21] + mi := &file_ateapi_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1279,7 +1369,7 @@ func (x *MintCertResponse) String() string { func (*MintCertResponse) ProtoMessage() {} func (x *MintCertResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[21] + mi := &file_ateapi_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1292,7 +1382,7 @@ func (x *MintCertResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertResponse.ProtoReflect.Descriptor instead. func (*MintCertResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{21} + return file_ateapi_proto_rawDescGZIP(), []int{23} } func (x *MintCertResponse) GetSessionCertificates() [][]byte { @@ -1372,7 +1462,12 @@ const file_ateapi_proto_rawDesc = "" + "\aversion\x18\b \x01(\x03R\aversion\x12$\n" + "\x0eworker_pod_uid\x18\t \x01(\tR\fworkerPodUid\"\x13\n" + "\x11DebugClearRequest\"\x14\n" + - "\x12DebugClearResponse\"{\n" + + "\x12DebugClearResponse\"\x1e\n" + + "\x1cWatchCapacityPressureRequest\"c\n" + + "\x15CapacityPressureEvent\x12)\n" + + "\x10worker_namespace\x18\x01 \x01(\tR\x0fworkerNamespace\x12\x1f\n" + + "\vworker_pool\x18\x02 \x01(\tR\n" + + "workerPool\"{\n" + "\x0eMintJWTRequest\x12\x1a\n" + "\baudience\x18\x01 \x03(\tR\baudience\x12\x15\n" + "\x06app_id\x18\x02 \x01(\tR\x05appId\x12\x17\n" + @@ -1389,7 +1484,7 @@ const file_ateapi_proto_rawDesc = "" + "session_id\x18\x03 \x01(\tR\tsessionId\x12>\n" + "\x1bcertificate_signing_request\x18\x04 \x01(\fR\x19certificateSigningRequest\"E\n" + "\x10MintCertResponse\x121\n" + - "\x14session_certificates\x18\x01 \x03(\fR\x13sessionCertificates2\xcd\x04\n" + + "\x14session_certificates\x18\x01 \x03(\fR\x13sessionCertificates2\xaf\x05\n" + "\aControl\x12?\n" + "\bGetActor\x12\x17.ateapi.GetActorRequest\x1a\x18.ateapi.GetActorResponse\"\x00\x12H\n" + "\vCreateActor\x12\x1a.ateapi.CreateActorRequest\x1a\x1b.ateapi.CreateActorResponse\"\x00\x12K\n" + @@ -1400,7 +1495,8 @@ const file_ateapi_proto_rawDesc = "" + "\n" + "ListActors\x12\x19.ateapi.ListActorsRequest\x1a\x1a.ateapi.ListActorsResponse\"\x00\x12E\n" + "\n" + - "DebugClear\x12\x19.ateapi.DebugClearRequest\x1a\x1a.ateapi.DebugClearResponse\"\x002\x8c\x01\n" + + "DebugClear\x12\x19.ateapi.DebugClearRequest\x1a\x1a.ateapi.DebugClearResponse\"\x00\x12`\n" + + "\x15WatchCapacityPressure\x12$.ateapi.WatchCapacityPressureRequest\x1a\x1d.ateapi.CapacityPressureEvent\"\x000\x012\x8c\x01\n" + "\x0fSessionIdentity\x12:\n" + "\aMintJWT\x12\x16.ateapi.MintJWTRequest\x1a\x17.ateapi.MintJWTResponse\x12=\n" + "\bMintCert\x12\x17.ateapi.MintCertRequest\x1a\x18.ateapi.MintCertResponseB9Z7github.com/agent-substrate/substrate/pkg/proto/ateapipbb\x06proto3" @@ -1418,31 +1514,33 @@ func file_ateapi_proto_rawDescGZIP() []byte { } var file_ateapi_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 22) +var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 24) var file_ateapi_proto_goTypes = []any{ - (Actor_Status)(0), // 0: ateapi.Actor.Status - (*Actor)(nil), // 1: ateapi.Actor - (*GetActorRequest)(nil), // 2: ateapi.GetActorRequest - (*GetActorResponse)(nil), // 3: ateapi.GetActorResponse - (*CreateActorRequest)(nil), // 4: ateapi.CreateActorRequest - (*CreateActorResponse)(nil), // 5: ateapi.CreateActorResponse - (*SuspendActorRequest)(nil), // 6: ateapi.SuspendActorRequest - (*SuspendActorResponse)(nil), // 7: ateapi.SuspendActorResponse - (*ResumeActorRequest)(nil), // 8: ateapi.ResumeActorRequest - (*ResumeActorResponse)(nil), // 9: ateapi.ResumeActorResponse - (*DeleteActorRequest)(nil), // 10: ateapi.DeleteActorRequest - (*DeleteActorResponse)(nil), // 11: ateapi.DeleteActorResponse - (*ListWorkersRequest)(nil), // 12: ateapi.ListWorkersRequest - (*ListWorkersResponse)(nil), // 13: ateapi.ListWorkersResponse - (*ListActorsRequest)(nil), // 14: ateapi.ListActorsRequest - (*ListActorsResponse)(nil), // 15: ateapi.ListActorsResponse - (*Worker)(nil), // 16: ateapi.Worker - (*DebugClearRequest)(nil), // 17: ateapi.DebugClearRequest - (*DebugClearResponse)(nil), // 18: ateapi.DebugClearResponse - (*MintJWTRequest)(nil), // 19: ateapi.MintJWTRequest - (*MintJWTResponse)(nil), // 20: ateapi.MintJWTResponse - (*MintCertRequest)(nil), // 21: ateapi.MintCertRequest - (*MintCertResponse)(nil), // 22: ateapi.MintCertResponse + (Actor_Status)(0), // 0: ateapi.Actor.Status + (*Actor)(nil), // 1: ateapi.Actor + (*GetActorRequest)(nil), // 2: ateapi.GetActorRequest + (*GetActorResponse)(nil), // 3: ateapi.GetActorResponse + (*CreateActorRequest)(nil), // 4: ateapi.CreateActorRequest + (*CreateActorResponse)(nil), // 5: ateapi.CreateActorResponse + (*SuspendActorRequest)(nil), // 6: ateapi.SuspendActorRequest + (*SuspendActorResponse)(nil), // 7: ateapi.SuspendActorResponse + (*ResumeActorRequest)(nil), // 8: ateapi.ResumeActorRequest + (*ResumeActorResponse)(nil), // 9: ateapi.ResumeActorResponse + (*DeleteActorRequest)(nil), // 10: ateapi.DeleteActorRequest + (*DeleteActorResponse)(nil), // 11: ateapi.DeleteActorResponse + (*ListWorkersRequest)(nil), // 12: ateapi.ListWorkersRequest + (*ListWorkersResponse)(nil), // 13: ateapi.ListWorkersResponse + (*ListActorsRequest)(nil), // 14: ateapi.ListActorsRequest + (*ListActorsResponse)(nil), // 15: ateapi.ListActorsResponse + (*Worker)(nil), // 16: ateapi.Worker + (*DebugClearRequest)(nil), // 17: ateapi.DebugClearRequest + (*DebugClearResponse)(nil), // 18: ateapi.DebugClearResponse + (*WatchCapacityPressureRequest)(nil), // 19: ateapi.WatchCapacityPressureRequest + (*CapacityPressureEvent)(nil), // 20: ateapi.CapacityPressureEvent + (*MintJWTRequest)(nil), // 21: ateapi.MintJWTRequest + (*MintJWTResponse)(nil), // 22: ateapi.MintJWTResponse + (*MintCertRequest)(nil), // 23: ateapi.MintCertRequest + (*MintCertResponse)(nil), // 24: ateapi.MintCertResponse } var file_ateapi_proto_depIdxs = []int32{ 0, // 0: ateapi.Actor.status:type_name -> ateapi.Actor.Status @@ -1460,20 +1558,22 @@ var file_ateapi_proto_depIdxs = []int32{ 12, // 12: ateapi.Control.ListWorkers:input_type -> ateapi.ListWorkersRequest 14, // 13: ateapi.Control.ListActors:input_type -> ateapi.ListActorsRequest 17, // 14: ateapi.Control.DebugClear:input_type -> ateapi.DebugClearRequest - 19, // 15: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest - 21, // 16: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest - 3, // 17: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse - 5, // 18: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse - 7, // 19: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse - 9, // 20: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse - 11, // 21: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse - 13, // 22: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse - 15, // 23: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse - 18, // 24: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse - 20, // 25: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse - 22, // 26: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse - 17, // [17:27] is the sub-list for method output_type - 7, // [7:17] is the sub-list for method input_type + 19, // 15: ateapi.Control.WatchCapacityPressure:input_type -> ateapi.WatchCapacityPressureRequest + 21, // 16: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest + 23, // 17: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest + 3, // 18: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse + 5, // 19: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse + 7, // 20: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse + 9, // 21: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse + 11, // 22: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse + 13, // 23: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse + 15, // 24: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse + 18, // 25: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse + 20, // 26: ateapi.Control.WatchCapacityPressure:output_type -> ateapi.CapacityPressureEvent + 22, // 27: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse + 24, // 28: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse + 18, // [18:29] is the sub-list for method output_type + 7, // [7:18] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name @@ -1490,7 +1590,7 @@ func file_ateapi_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ateapi_proto_rawDesc), len(file_ateapi_proto_rawDesc)), NumEnums: 1, - NumMessages: 22, + NumMessages: 24, NumExtensions: 0, NumServices: 2, }, diff --git a/pkg/proto/ateapipb/ateapi.proto b/pkg/proto/ateapipb/ateapi.proto index 8cce2b41c..0f8541352 100644 --- a/pkg/proto/ateapipb/ateapi.proto +++ b/pkg/proto/ateapipb/ateapi.proto @@ -46,6 +46,11 @@ service Control { // Debugging: drop all data from the ate database. rpc DebugClear(DebugClearRequest) returns (DebugClearResponse) {} + + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + rpc WatchCapacityPressure(WatchCapacityPressureRequest) returns (stream CapacityPressureEvent) {} } message Actor { @@ -168,6 +173,15 @@ message DebugClearRequest {} message DebugClearResponse {} +message WatchCapacityPressureRequest {} + +// CapacityPressureEvent signals that a worker pool had no free worker for a +// resume, identifying the pool so a consumer can react per-pool. +message CapacityPressureEvent { + string worker_namespace = 1; + string worker_pool = 2; +} + // SessionIdentity allows substrate workloads to exchange their // infrastructure-level credentials (k8s service account token, etc.) for a // substrate session-level credential. A given substrate session might migrate diff --git a/pkg/proto/ateapipb/ateapi_grpc.pb.go b/pkg/proto/ateapipb/ateapi_grpc.pb.go index c79b0cbff..d6cb7a380 100644 --- a/pkg/proto/ateapipb/ateapi_grpc.pb.go +++ b/pkg/proto/ateapipb/ateapi_grpc.pb.go @@ -35,14 +35,15 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" - Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" - Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" - Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" - Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" - Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" - Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" - Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" + Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" + Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" + Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" + Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" + Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" + Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" + Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" + Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" + Control_WatchCapacityPressure_FullMethodName = "/ateapi.Control/WatchCapacityPressure" ) // ControlClient is the client API for Control service. @@ -67,6 +68,10 @@ type ControlClient interface { ListActors(ctx context.Context, in *ListActorsRequest, opts ...grpc.CallOption) (*ListActorsResponse, error) // Debugging: drop all data from the ate database. DebugClear(ctx context.Context, in *DebugClearRequest, opts ...grpc.CallOption) (*DebugClearResponse, error) + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + WatchCapacityPressure(ctx context.Context, in *WatchCapacityPressureRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[CapacityPressureEvent], error) } type controlClient struct { @@ -157,6 +162,25 @@ func (c *controlClient) DebugClear(ctx context.Context, in *DebugClearRequest, o return out, nil } +func (c *controlClient) WatchCapacityPressure(ctx context.Context, in *WatchCapacityPressureRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[CapacityPressureEvent], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Control_ServiceDesc.Streams[0], Control_WatchCapacityPressure_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[WatchCapacityPressureRequest, CapacityPressureEvent]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Control_WatchCapacityPressureClient = grpc.ServerStreamingClient[CapacityPressureEvent] + // ControlServer is the server API for Control service. // All implementations must embed UnimplementedControlServer // for forward compatibility. @@ -179,6 +203,10 @@ type ControlServer interface { ListActors(context.Context, *ListActorsRequest) (*ListActorsResponse, error) // Debugging: drop all data from the ate database. DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) + // Stream a notification whenever a resume finds no free worker in a pool — a + // capacity-pressure signal sensed at the request edge. The autoscaler + // subscribes so it can react immediately instead of waiting for its poll. + WatchCapacityPressure(*WatchCapacityPressureRequest, grpc.ServerStreamingServer[CapacityPressureEvent]) error mustEmbedUnimplementedControlServer() } @@ -213,6 +241,9 @@ func (UnimplementedControlServer) ListActors(context.Context, *ListActorsRequest func (UnimplementedControlServer) DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) { return nil, status.Error(codes.Unimplemented, "method DebugClear not implemented") } +func (UnimplementedControlServer) WatchCapacityPressure(*WatchCapacityPressureRequest, grpc.ServerStreamingServer[CapacityPressureEvent]) error { + return status.Error(codes.Unimplemented, "method WatchCapacityPressure not implemented") +} func (UnimplementedControlServer) mustEmbedUnimplementedControlServer() {} func (UnimplementedControlServer) testEmbeddedByValue() {} @@ -378,6 +409,17 @@ func _Control_DebugClear_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Control_WatchCapacityPressure_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchCapacityPressureRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ControlServer).WatchCapacityPressure(m, &grpc.GenericServerStream[WatchCapacityPressureRequest, CapacityPressureEvent]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Control_WatchCapacityPressureServer = grpc.ServerStreamingServer[CapacityPressureEvent] + // Control_ServiceDesc is the grpc.ServiceDesc for Control service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -418,7 +460,13 @@ var Control_ServiceDesc = grpc.ServiceDesc{ Handler: _Control_DebugClear_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WatchCapacityPressure", + Handler: _Control_WatchCapacityPressure_Handler, + ServerStreams: true, + }, + }, Metadata: "ateapi.proto", } From bc5b9f03eca55e22cc1865383d92fbf134ea3cc5 Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Wed, 10 Jun 2026 14:55:49 +0300 Subject: [PATCH 5/7] feat(autoscaler): trigger reactive scale-up on capacity pressure Subscribe the WorkerPool autoscaler to ateapi's WatchCapacityPressure stream and turn each event into an immediate reconcile of that pool via a controller-runtime source.Channel, so an empty pool scales up at the miss instead of at the next poll. The periodic requeue remains the down-path and safety net. The stream watcher runs as a manager runnable and reconnects on error without crashing the manager. Part of #198 (Phase 3b). --- internal/controllers/workerpool_autoscaler.go | 80 ++++++++++++++++++- .../controllers/workerpool_autoscaler_test.go | 54 ++++++++++++- 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/internal/controllers/workerpool_autoscaler.go b/internal/controllers/workerpool_autoscaler.go index d07311e7a..8d3dc20c9 100644 --- a/internal/controllers/workerpool_autoscaler.go +++ b/internal/controllers/workerpool_autoscaler.go @@ -21,18 +21,27 @@ import ( "time" k8errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/agent-substrate/substrate/internal/autoscaler" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" ) +// pressureReconnectDelay is how long the capacity-pressure watcher waits before +// re-opening the stream after it ends or errors. +const pressureReconnectDelay = 2 * time.Second + // defaultAutoscaleInterval is how often each autoscaled pool is re-evaluated // when nothing else triggers a reconcile. Occupancy lives in ateapi rather than // in Kubernetes, so the loop polls on this cadence instead of waking on events. @@ -61,6 +70,10 @@ type WorkerPoolAutoscaler struct { // is safe: it merely restarts the (conservative) down timer. mu sync.Mutex downSince map[types.NamespacedName]time.Time + + // pressureEvents carries capacity-pressure notifications from ateapi into the + // controller's workqueue as immediate reconciles (the reactive up-path). + pressureEvents chan event.GenericEvent } //+kubebuilder:rbac:groups=ate.dev,resources=workerpools,verbs=get;list;watch;update;patch @@ -192,7 +205,8 @@ func (r *WorkerPoolAutoscaler) forget(key types.NamespacedName) { // SetupWithManager registers the autoscaler. It uses a distinct controller name // (WorkerPoolReconciler also watches WorkerPool) and a generation predicate so -// status-only writes don't wake it — periodic requeue drives the polling. +// status-only writes don't wake the periodic path. A second event source — +// fed by the capacity-pressure watcher — supplies the reactive up-path. func (r *WorkerPoolAutoscaler) SetupWithManager(mgr ctrl.Manager) error { r.mu.Lock() if r.downSince == nil { @@ -200,8 +214,72 @@ func (r *WorkerPoolAutoscaler) SetupWithManager(mgr ctrl.Manager) error { } r.mu.Unlock() + // Reactive up-path: ateapi streams a capacity-pressure event the instant a + // pool has no free worker; the watcher turns each into an immediate reconcile + // of that pool. The For() generation predicate does not apply to this source, + // so pressure always enqueues. The periodic requeue remains the slow path. + r.pressureEvents = make(chan event.GenericEvent, 128) + if err := mgr.Add(manager.RunnableFunc(r.watchCapacityPressure)); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&atev1alpha1.WorkerPool{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + WatchesRawSource(source.Channel(r.pressureEvents, &handler.EnqueueRequestForObject{})). Named("workerpool-autoscaler"). Complete(r) } + +// watchCapacityPressure subscribes to ateapi's capacity-pressure stream and +// enqueues an immediate reconcile for each pool that misses. It runs for the +// life of the manager, reconnecting (with a short delay) whenever the stream +// ends or errors. It only returns when the manager context is cancelled, so a +// failing stream never takes the manager down. +func (r *WorkerPoolAutoscaler) watchCapacityPressure(ctx context.Context) error { + log := log.FromContext(ctx).WithName("capacity-pressure-watch") + for { + if ctx.Err() != nil { + return nil + } + stream, err := r.AteClient.WatchCapacityPressure(ctx, &ateapipb.WatchCapacityPressureRequest{}) + if err != nil { + log.V(1).Info("capacity-pressure stream open failed; will retry", "err", err) + if !sleep(ctx, pressureReconnectDelay) { + return nil + } + continue + } + + for { + ev, err := stream.Recv() + if err != nil { + log.V(1).Info("capacity-pressure stream ended; will reconnect", "err", err) + break + } + select { + case r.pressureEvents <- event.GenericEvent{Object: &atev1alpha1.WorkerPool{ + ObjectMeta: metav1.ObjectMeta{Namespace: ev.GetWorkerNamespace(), Name: ev.GetWorkerPool()}, + }}: + case <-ctx.Done(): + return nil + } + } + + if !sleep(ctx, pressureReconnectDelay) { + return nil + } + } +} + +// sleep waits for d or until ctx is cancelled. It reports whether the full +// delay elapsed (true) versus being cut short by cancellation (false). +func sleep(ctx context.Context, d time.Duration) bool { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-t.C: + return true + } +} diff --git a/internal/controllers/workerpool_autoscaler_test.go b/internal/controllers/workerpool_autoscaler_test.go index f5bc761bf..4a638dec1 100644 --- a/internal/controllers/workerpool_autoscaler_test.go +++ b/internal/controllers/workerpool_autoscaler_test.go @@ -16,12 +16,14 @@ package controllers import ( "context" + "io" "testing" "time" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" "github.com/agent-substrate/substrate/internal/autoscaler" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" @@ -35,13 +37,63 @@ func ptrInt32(v int32) *int32 { return &v } // nil embedded interface, which keeps the stub honest about what it relies on. type stubControl struct { ateapipb.ControlClient - workers []*ateapipb.Worker + workers []*ateapipb.Worker + pressure <-chan *ateapipb.CapacityPressureEvent } func (s *stubControl) ListWorkers(context.Context, *ateapipb.ListWorkersRequest, ...grpc.CallOption) (*ateapipb.ListWorkersResponse, error) { return &ateapipb.ListWorkersResponse{Workers: s.workers}, nil } +func (s *stubControl) WatchCapacityPressure(ctx context.Context, _ *ateapipb.WatchCapacityPressureRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[ateapipb.CapacityPressureEvent], error) { + return &stubPressureStream{ctx: ctx, events: s.pressure}, nil +} + +// stubPressureStream is a minimal server-streaming client backed by a channel. +type stubPressureStream struct { + grpc.ClientStream + ctx context.Context + events <-chan *ateapipb.CapacityPressureEvent +} + +func (s *stubPressureStream) Recv() (*ateapipb.CapacityPressureEvent, error) { + select { + case e, ok := <-s.events: + if !ok { + return nil, io.EOF + } + return e, nil + case <-s.ctx.Done(): + return nil, s.ctx.Err() + } +} + +// TestAutoscalerCapacityPressureTriggersReconcile checks that a streamed +// capacity-pressure event is turned into an immediate reconcile request for the +// named pool (the reactive up-path). +func TestAutoscalerCapacityPressureTriggersReconcile(t *testing.T) { + events := make(chan *ateapipb.CapacityPressureEvent, 1) + r := &WorkerPoolAutoscaler{ + AteClient: &stubControl{pressure: events}, + pressureEvents: make(chan event.GenericEvent, 1), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { _ = r.watchCapacityPressure(ctx) }() + + events <- &ateapipb.CapacityPressureEvent{WorkerNamespace: "ns", WorkerPool: "pool"} + + select { + case ev := <-r.pressureEvents: + if ev.Object.GetNamespace() != "ns" || ev.Object.GetName() != "pool" { + t.Fatalf("reconcile event for %s/%s, want ns/pool", ev.Object.GetNamespace(), ev.Object.GetName()) + } + case <-time.After(2 * time.Second): + t.Fatal("capacity-pressure event did not produce a reconcile request") + } +} + // poolWorkers builds `total` workers for a pool, the first `occupied` of which // carry an actor (the rest are free/idle). func poolWorkers(ns, pool string, total, occupied int) []*ateapipb.Worker { From 10c8fdf5d117d652213ffc6a8221cd3e9f86bb40 Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Thu, 11 Jun 2026 00:03:03 +0300 Subject: [PATCH 6/7] docs(demo): add WorkerPool autoscaling demo Add demos/autoscaling: an autoscaled WorkerPool manifest (minReady, targetBuffer, maxReplicas), a scenario driver (demo.sh) that exercises reactive scale-up and hysteretic scale-down via kubectl-ate, and a README walkthrough. Part of #198. --- demos/autoscaling/README.md | 100 +++++++++++++++++ demos/autoscaling/autoscaling.yaml.tmpl | 68 ++++++++++++ demos/autoscaling/demo.sh | 140 ++++++++++++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 demos/autoscaling/README.md create mode 100644 demos/autoscaling/autoscaling.yaml.tmpl create mode 100755 demos/autoscaling/demo.sh diff --git a/demos/autoscaling/README.md b/demos/autoscaling/README.md new file mode 100644 index 000000000..7c2768bc8 --- /dev/null +++ b/demos/autoscaling/README.md @@ -0,0 +1,100 @@ +# WorkerPool autoscaling demo + +Demonstrates demand-reactive autoscaling of a `WorkerPool` ([issue #198]) on a +running ate cluster: + +- **Reactive scale-up** — when a resume finds no free worker, ateapi emits a + capacity-pressure signal and the autoscaler raises `spec.replicas` + *immediately*, at the request edge — not at its next poll. +- **Hysteretic scale-down** — once idle workers are in surplus, the pool shrinks + back to `minReady`, but only after a stabilization window, so a brief lull + never discards warm capacity. + +The pool keeps a small warm **buffer** (`targetBuffer`) of idle workers so a +burst is served by capacity that *already exists*; the autoscaler's job is to +refill that buffer fast and trim it slowly. + +[issue #198]: https://github.com/agent-substrate/substrate/issues/198 + +## What gets deployed + +`autoscaling.yaml.tmpl` creates the `ate-demo-autoscaling` namespace, an +autoscaled `WorkerPool`, and an `ActorTemplate` that reuses the `counter` +workload (the workload is irrelevant — the focus is the pool): + +```yaml +spec: + replicas: 2 # starting point; the autoscaler owns it from here + minReady: 2 # reservation floor — never scale below 2 warm workers + targetBuffer: 2 # keep ~2 idle workers ready to absorb a burst + maxReplicas: 8 # ceiling +``` + +## Prerequisites + +- An ate cluster **deployed from this branch** (`worktree-wp-autoscaling`), so + `ate-controller` runs the WorkerPool autoscaler and the CRD carries the new + `minReady` / `targetBuffer` / `maxReplicas` fields. A kind cluster via + `hack/install-ate-kind.sh` works. +- `KO_DOCKER_REPO` and `BUCKET_NAME` set (same as the other demos — `ko` + resolves the `ko://` images and `BUCKET_NAME` is the GCS bucket for actor + snapshots). +- The `kubectl-ate` plugin available as `kubectl ate` (or run the demo with + `ATE=./bin/kubectl-ate ./demo.sh`). If your CLI needs an explicit API + endpoint, set it the same way you do for other `kubectl ate` calls. + +## Run + +```sh +# From the repo root: +./demos/autoscaling/demo.sh # deploys if needed, then runs the scenario +``` + +Watch it react live in another terminal: + +```sh +kubectl get workerpool autoscaling -n ate-demo-autoscaling -w +kubectl logs -n ate-system deploy/ate-controller -f | grep 'autoscaled WorkerPool' +``` + +The driver also accepts: + +```sh +./demos/autoscaling/demo.sh deploy # just apply the manifest +./demos/autoscaling/demo.sh cleanup # remove the demo actors + manifest +``` + +Tunables (env vars): `N` (actors woken, default 6), `SCALE_DOWN_WAIT` +(seconds to wait for the shrink, default 180), `ATE`, `KO`. + +## What you should see + +1. **Steady** — the pool settles at `DESIRED 2 / REPLICAS 2`: two idle workers + held warm. +2. **Burst** — the demo wakes 6 actors. The first two consume the warm buffer; + the next resume finds no free worker and returns `503`. That miss is the + trigger: `ate-controller` logs `autoscaled WorkerPool` and `spec.replicas` + jumps up toward `occupied + targetBuffer`, capped at `maxReplicas=8`. The + `503`'d resumes succeed on retry once the new workers finish booting (cold + start takes a little while — that delay is *why* the warm buffer exists). +3. **Idle** — the demo suspends all actors, freeing their workers. The buffer is + now in surplus. After the stabilization window (~60s) the pool shrinks back + to the floor, `DESIRED 2`. + +To see the ceiling enforced, run with `N=12 ./demos/autoscaling/demo.sh`: the +pool grows to `maxReplicas=8` and no further, so the excess resumes keep +getting `503` (there is no capacity left to create). + +## How it works + +- ateapi publishes a pool-scoped `CapacityPressureEvent` whenever + `AssignWorkerStep` finds no free worker, exposed via the + `WatchCapacityPressure` streaming RPC. +- `WorkerPoolAutoscaler` (in `ate-controller`) subscribes to that stream and + turns each event into an immediate reconcile of the pool. It also re-evaluates + on a ~10s poll (the scale-down path and a safety net). It reads occupancy via + `ListWorkers`, runs the decision policy, and is the **single writer** of + `spec.replicas`; `WorkerPoolReconciler` still owns the Deployment. + +> Note: `ate-controller` runs without leader election today, so the +> single-writer guarantee assumes a single controller replica. diff --git a/demos/autoscaling/autoscaling.yaml.tmpl b/demos/autoscaling/autoscaling.yaml.tmpl new file mode 100644 index 000000000..092cc4eeb --- /dev/null +++ b/demos/autoscaling/autoscaling.yaml.tmpl @@ -0,0 +1,68 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Namespace +metadata: + name: ate-demo-autoscaling + +--- + +apiVersion: ate.dev/v1alpha1 +kind: WorkerPool +metadata: + name: autoscaling + namespace: ate-demo-autoscaling +spec: + # replicas becomes the autoscaler's to own once the bounds below are set; + # this is only the starting point. At steady state the autoscaler keeps + # `targetBuffer` idle workers, so a fresh pool settles at minReady warm. + replicas: 2 + ateomImage: ko://github.com/agent-substrate/substrate/cmd/ateom-gvisor + + # --- Autoscaling bounds (the feature this demo exercises, issue #198) --- + # Never scale below 2 warm workers (the reservation floor). + minReady: 2 + # Keep ~2 idle (warm) workers ready to absorb a resume burst. + targetBuffer: 2 + # Ceiling the autoscaler may grow to. + maxReplicas: 8 + +--- + +apiVersion: ate.dev/v1alpha1 +kind: ActorTemplate +metadata: + name: autoscaling + namespace: ate-demo-autoscaling +spec: + runsc: + amd64: + url: "gs://gvisor/releases/nightly/2026-05-19/x86_64/runsc" + sha256Hash: "a397be1abc2420d26bce6c70e6e2ff96c73aaaab929756c56f5e2089ea842b63" + arm64: + url: "gs://gvisor/releases/nightly/2026-05-19/aarch64/runsc" + sha256Hash: "1ba2366ae2efceba166046f51a4104f9261c9cb72c6db8f5b3fe2dc57dea86b9" + pauseImage: "registry.k8s.io/pause:3.10.2@sha256:f548e0e8e3dc1896ca956272154dde3314e8cc4fde0a57577ee9fa1c63f5baf4" + containers: + - name: workload + # Any workload works; the counter demo binary is reused so this demo has no + # source of its own — the focus is the WorkerPool, not the actor. + image: ko://github.com/agent-substrate/substrate/demos/counter + command: ["/ko-app/counter"] + workerPoolRef: + namespace: ate-demo-autoscaling + name: autoscaling + snapshotsConfig: + location: gs://${BUCKET_NAME}/ate-demo-autoscaling/ diff --git a/demos/autoscaling/demo.sh b/demos/autoscaling/demo.sh new file mode 100755 index 000000000..a40ed5242 --- /dev/null +++ b/demos/autoscaling/demo.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash + +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Demo: WorkerPool autoscaling (issue #198). +# +# reactive scale-UP — a resume that finds no free worker emits a +# capacity-pressure signal; the autoscaler raises +# spec.replicas immediately (not at the next poll). +# hysteretic scale-DOWN — once the buffer is in surplus, the pool shrinks +# back to minReady, but only after a stabilization +# window so a brief lull never throws away warm workers. +# +# See README.md for prerequisites. Usage: +# ./demo.sh [run] deploy if needed, then run the up + down scenario +# ./demo.sh deploy just apply the manifest +# ./demo.sh cleanup suspend/delete the demo actors and remove the manifest + +set -o errexit -o nounset -o pipefail + +ATE="${ATE:-kubectl ate}" # override e.g. ATE=./bin/kubectl-ate +KO="${KO:-hack/run-tool.sh ko}" # how to resolve ko:// images +NS="ate-demo-autoscaling" +POOL="autoscaling" +TEMPLATE="${NS}/${POOL}" +N="${N:-6}" # actors to wake (drives the burst) +SCALE_DOWN_WAIT="${SCALE_DOWN_WAIT:-180}" # seconds to wait for hysteretic shrink +ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +TMPL="${ROOT}/demos/autoscaling/autoscaling.yaml.tmpl" + +banner() { printf '\n\033[1;36m== %s ==\033[0m\n' "$*"; } +pool() { kubectl get workerpool "$POOL" -n "$NS"; } +desired() { kubectl get workerpool "$POOL" -n "$NS" -o jsonpath='{.spec.replicas}'; } + +render() { + : "${BUCKET_NAME:?set BUCKET_NAME (GCS bucket for actor snapshots)}" + sed "s|\${BUCKET_NAME}|${BUCKET_NAME}|g" "$TMPL" +} + +deploy() { + banner "Deploying the autoscaled WorkerPool + ActorTemplate" + render | ( cd "$ROOT" && $KO apply -f - ) + kubectl rollout status "deployment/${POOL}-deployment" -n "$NS" --timeout=300s + kubectl wait --for=condition=Ready "actortemplate/${POOL}" -n "$NS" --timeout=300s +} + +cleanup() { + banner "Cleanup" + for i in $(seq 1 "$N"); do + $ATE suspend actor "demo-${i}" >/dev/null 2>&1 || true + $ATE delete actor "demo-${i}" >/dev/null 2>&1 || true + done + render | kubectl delete --ignore-not-found -f - +} + +# resume_with_retry tolerates the 503 a resume gets when the buffer is empty: +# that miss is exactly what triggers the scale-up, and the retry lands once a +# fresh worker has booted. Bounded so the demo never hangs on slow cold starts. +resume_with_retry() { + local id="$1" tries=0 + until $ATE resume actor "$id" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ "$tries" -ge 15 ]; then + echo " resume ${id}: still no capacity after ${tries} tries (workers may still be booting)" + return 0 + fi + echo " resume ${id}: no free worker (503) — autoscaler reacting; retrying (${tries})..." + sleep 4 + done + echo " resume ${id}: running" +} + +main() { + if ! kubectl get workerpool "$POOL" -n "$NS" >/dev/null 2>&1; then + deploy + fi + + banner "Initial state — autoscaler holds the warm buffer (minReady=2, targetBuffer=2)" + pool + echo + echo "Tip: watch live in another terminal:" + echo " kubectl get workerpool ${POOL} -n ${NS} -w" + echo " kubectl logs -n ate-system deploy/ate-controller -f | grep 'autoscaled WorkerPool'" + + banner "Burst: waking ${N} actors — drains the buffer and triggers reactive scale-up" + for i in $(seq 1 "$N"); do + $ATE create actor "demo-${i}" -t "$TEMPLATE" >/dev/null 2>&1 || true + done + for i in $(seq 1 "$N"); do + resume_with_retry "demo-${i}" + done + sleep 3 + echo + echo "After the burst (spec.replicas climbed toward occupied + targetBuffer, capped at maxReplicas=8):" + pool + + banner "Idle: suspending all actors — frees workers, buffer goes into surplus" + for i in $(seq 1 "$N"); do + $ATE suspend actor "demo-${i}" >/dev/null 2>&1 || true + done + pool + echo + echo "Scale-down is hysteretic — waiting up to ${SCALE_DOWN_WAIT}s for the stabilization window..." + local deadline + deadline=$(( $(date +%s) + SCALE_DOWN_WAIT )) + while [ "$(date +%s)" -lt "$deadline" ]; do + local d + d="$(desired)" + echo " $(date +%H:%M:%S) spec.replicas=${d}" + if [ "${d:-99}" -le 2 ]; then + echo " shrunk to the reservation floor (minReady=2)." + break + fi + sleep 10 + done + + banner "Done" + pool + echo + echo "Run './demo.sh cleanup' to remove the demo actors and manifest." +} + +case "${1:-run}" in + run) main ;; + deploy) deploy ;; + cleanup) cleanup ;; + *) echo "usage: $0 [run|deploy|cleanup]" >&2; exit 1 ;; +esac From 62df3d26288f7f12cccca4a55a262c623a0ea0ff Mon Sep 17 00:00:00 2001 From: Omer Yahud Date: Thu, 11 Jun 2026 15:21:47 +0300 Subject: [PATCH 7/7] fix demo doc Signed-off-by: Omer Yahud --- demos/autoscaling/README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/demos/autoscaling/README.md b/demos/autoscaling/README.md index 7c2768bc8..0bd3caa3e 100644 --- a/demos/autoscaling/README.md +++ b/demos/autoscaling/README.md @@ -32,10 +32,8 @@ spec: ## Prerequisites -- An ate cluster **deployed from this branch** (`worktree-wp-autoscaling`), so - `ate-controller` runs the WorkerPool autoscaler and the CRD carries the new - `minReady` / `targetBuffer` / `maxReplicas` fields. A kind cluster via - `hack/install-ate-kind.sh` works. +- An ate cluster deployed +- A kind cluster via `hack/install-ate-kind.sh` works. - `KO_DOCKER_REPO` and `BUCKET_NAME` set (same as the other demos — `ko` resolves the `ko://` images and `BUCKET_NAME` is the GCS bucket for actor snapshots).