diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index aa6e610..6e968d3 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -36,7 +36,7 @@ jobs: - name: Create k8s Kind Cluster uses: helm/kind-action@v1.10.0 with: - version: v0.32.0 # Define your custom KinD CLI version here + version: v0.32.0 # required for gang node_image: kindest/node:v1.36.1 config: ./deploy/kind-config.yaml @@ -78,6 +78,8 @@ jobs: done [ -n "$POD" ] || { echo "ERROR: no Running fluence pod found"; exit 1; } echo "Using pod: $POD" + # Brief sleep to let the container runtime stabilize before exec + sleep 5 kubectl -n kube-system exec "$POD" -- ls /tmp/ kubectl -n kube-system logs "$POD" kubectl -n kube-system exec "$POD" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" @@ -107,6 +109,8 @@ jobs: done [ -n "$POD" ] || { echo "ERROR: no Running fluence pod found after restart"; exit 1; } echo "Using pod: $POD" + # Brief sleep to let the container runtime stabilize before exec + sleep 5 kubectl -n kube-system exec "$POD" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" - name: Wait for webhook @@ -129,6 +133,9 @@ jobs: #- name: E2E - restart recovery (no double-book) # run: bash test/e2e/03-restart-recovery.sh + - name: E2E - sidecar ungate + run: bash test/e2e/04-sidecar-ungate.sh + - name: Dump diagnostics on failure if: failure() run: | diff --git a/.github/workflows/sidecar-build-deploy.yaml b/.github/workflows/sidecar-build-deploy.yaml new file mode 100644 index 0000000..c11245d --- /dev/null +++ b/.github/workflows/sidecar-build-deploy.yaml @@ -0,0 +1,71 @@ +name: sidecar-build-deploy + +on: + push: + branches: [main] + tags: ["v*"] + paths: + - "sidecars/**" + - ".github/workflows/sidecar-build-deploy.yaml" + pull_request: + branches: [main] + paths: + - "sidecars/**" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + REGISTRY: ghcr.io + +jobs: + build-deploy: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + strategy: + matrix: + sidecar: + - braket + # - qrmi # uncomment when implemented + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Image metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ github.repository }}-sidecar-${{ matrix.sidecar }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=sha + type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }} + + - name: Build and push ${{ matrix.sidecar }} sidecar + uses: docker/build-push-action@v6 + with: + context: . + file: ./sidecars/${{ matrix.sidecar }}/Dockerfile + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Makefile b/Makefile index df5519f..9613bc5 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,12 @@ build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go) CGO_ENABLED=0 go build -o bin/fluence-deviceplugin ./cmd/deviceplugin CGO_ENABLED=0 go build -o bin/fluence-webhook ./cmd/webhook +.PHONY: sidecars +sidecars: + docker build -f sidecars/braket/Dockerfile -t ghcr.io/converged-computing/fluence-sidecar-braket:latest . + docker push ghcr.io/converged-computing/fluence-sidecar-braket:latest + # kind load docker-image ghcr.io/converged-computing/fluence-sidecar-braket:latest + .PHONY: test test: CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ diff --git a/README.md b/README.md index 0068322..54870c0 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ native Kubernetes PodGroup (Gang) API — no sidecar, and no proven out first in [fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). -## How the pieces fit together +## How does it work? ```console resources.yaml --+ @@ -80,8 +80,6 @@ virtual resource (`virtual=true`) cannot be co-selected in one match -- one woul prune the other. A pod needing both produces **two** match-allocate calls, held together all-or-nothing. ---- - ## Components ### `pkg/jgf` — JGF graph builder @@ -193,6 +191,39 @@ admission. The real gating is Fluxion (and the backend's own limits); since a virtual backend is reachable from any node, each type is advertised at a large ceiling. Types come from the same config as the graph, so they can't drift. +### `sidecars/` — quantum coordination sidecars + +Vendor-specific sidecar containers injected by the webhook into leader pods +of quantum workflow groups. Each sidecar discovers the QPU task submitted by +the leader, polls the vendor queue, and ungates worker pods when the task +reaches position==1. + +```console +sidecars/ + lib/ungate.py shared gate removal + ARN annotation logic + braket/ + sidecar.py AWS Braket sidecar (tag search, queue polling, ungate) + fluence_braket_intercept.py AwsDevice.run() monkey-patch (PYTHONSTARTUP) + Dockerfile build context is sidecars/ to include lib/ + design.md full design document + test/integration.sh local integration test (requires AWS credentials) +``` + +The sidecar is injected automatically — users only need the group label: + +```yaml +metadata: + labels: + fluence.flux-framework.org/group: my-workflow +spec: + schedulerName: fluence +``` + +Fluence creates the PodGroup, injects the sidecar, creates per-namespace +RBAC, and gates all non-leader pods. See `sidecars/braket/design.md` for +the full design including the SDK interceptor, queue position polling, and +the two-queue problem motivation. + ### `pkg/webhook` — environment injection A mutating webhook that surfaces scheduler-chosen values to a workload. Container @@ -210,7 +241,9 @@ workload reads these normalized names regardless of which backend it matched. - `cmd/deviceplugin` — the extended-resource DaemonSet. - `cmd/webhook` — the env-injection webhook. - `cmd/recovery-probe` — verifies allocation replay survives a graph rebuild - (what a restart does); see `make test-restore`. Note this was implemented but removed because the code in fluxion is only part of a PR branch, and I feel nervous about depending on it. + (what a restart does); see `make test-restore`. + +Note that the recovery probe (and graph restore) was implemented but removed because the code in fluxion is only part of a PR branch, and I feel nervous about depending on it. ## Configuration @@ -335,6 +368,38 @@ Submission is **not** done by the scheduler — the workload container holds the user's credentials and submits via qrmi-go. Fluence only schedules and hands off the backend. (When we control local quantum devices this will change.) +### 3. Quantum workflow groups (leader + workers) + +For workflows where a leader pod submits quantum work and worker pods process +the results, add the group label to all pods. Fluence gates the workers until +the QPU task reaches position==1 in the vendor queue: + +```yaml +# All pods in the group get the same label +metadata: + labels: + fluence.flux-framework.org/group: my-qaoa-workflow +spec: + schedulerName: fluence +``` + +The first pod admitted becomes the leader — Fluence injects the sidecar and +creates a PodGroup with `minCount: 1`. All subsequent pods get a +`quantum.braket/ready` scheduling gate and consume no node resources during +the QPU queue wait. When the sidecar observes `queue_position == 1`, it +patches the task ARN onto each worker pod's annotations and removes their +gates atomically with setting `fluence-quantum-classical` priority class. + +Per-namespace RBAC (`fluence-sidecar` ServiceAccount/Role/RoleBinding) and +the interceptor ConfigMap are created automatically by the webhook on first +use — no manual setup required. + +```bash +# Apply per-namespace RBAC is NOT needed — webhook creates it automatically. +# Just apply your pods with the group label and schedulerName: fluence. +kubectl apply -f my-quantum-workflow.yaml +``` + ### Notes - **Deletion hangs.** A PodGroup can hang on delete via finalizers if the workload @@ -350,4 +415,4 @@ See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE). SPDX-License-Identifier: MIT -LLNL-CODE-842614 \ No newline at end of file +LLNL-CODE-842614 diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 20fac0d..eeca700 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -83,7 +83,11 @@ func main() { log.Printf("no resources config at %s (%v); injecting FLUXION_BACKEND only", path, rerr) } } - mutator := &webhook.Mutator{AttributeKeys: attrKeys} + mutator := &webhook.Mutator{ + AttributeKeys: attrKeys, + Client: client, + SidecarImage: env("FLUENCE_SIDECAR_IMAGE", ""), + } log.Printf("[fluence-webhook] env contract injected into fluxion pods: %v", mutator.EnvVarNames()) mux := http.NewServeMux() diff --git a/deploy/fluence-test.yaml b/deploy/fluence-test.yaml index 0eb6f8a..965516c 100644 --- a/deploy/fluence-test.yaml +++ b/deploy/fluence-test.yaml @@ -58,7 +58,7 @@ metadata: rules: - apiGroups: ["scheduling.k8s.io"] resources: ["podgroups", "workloads", "podgroups/status", "workloads/status"] - verbs: ["get", "list", "watch", "update", "patch"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "list", "watch"] @@ -72,6 +72,17 @@ rules: - apiGroups: ["admissionregistration.k8s.io"] resources: ["mutatingwebhookconfigurations"] verbs: ["get", "list", "watch", "patch"] + # The webhook creates per-namespace sidecar RBAC on demand when a leader + # pod is admitted, so users do not need to apply RBAC manually. + - apiGroups: [""] + resources: ["serviceaccounts"] + verbs: ["get", "create"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles", "rolebindings"] + verbs: ["get", "create"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -177,6 +188,11 @@ spec: # Allows for kind load imagePullPolicy: Never command: ["/bin/fluence-webhook"] + env: + # Use busybox as sidecar image in tests — avoids pulling the real + # sidecar image which is large and not cached in CI. + - name: FLUENCE_SIDECAR_IMAGE + value: "busybox:latest" ports: - containerPort: 8443 readinessProbe: @@ -222,3 +238,80 @@ webhooks: - key: kubernetes.io/metadata.name operator: NotIn values: ["kube-system"] +--- +# fluence-sidecar.yaml +# +# RBAC and supporting resources for the Fluence quantum sidecar. +# +# The sidecar runs inside a leader pod and needs: +# - patch/annotate on pods in its own namespace (to ungate workers and +# propagate the task ARN annotation) +# +# The sidecar ServiceAccount is namespace-scoped — it only has permissions +# in the namespace where the workflow runs. The webhook sets +# spec.serviceAccountName on the leader pod to fluence-sidecar. +# +# The SDK interceptor ConfigMap holds fluence_braket_intercept.py which +# the webhook mounts into user containers as a Python sitecustomize hook, +# transparently tagging every device.run() call with the pod UID. +# +# Apply with: +# kubectl apply -f deploy/fluence-sidecar.yaml + + +--- +# PriorityClass for classical pods paired with quantum work. +# Applied to worker pods by the webhook when they are gated. +# When ungated, high priority triggers preemption of lower-priority work +# so workers get nodes immediately as the QPU result arrives. +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: fluence-quantum-classical + labels: + app: fluence +value: 1000000 +globalDefault: false +preemptionPolicy: PreemptLowerPriority +description: "High priority for classical pods paired with quantum work. Set by Fluence webhook." +--- +# SDK interceptor ConfigMap — holds the Python sitecustomize hook that +# patches AwsDevice.run() to tag every quantum task with the pod UID. +# The webhook mounts this into user containers at Python's site-packages +# path so it runs automatically before any user code. +# +# Mounted at: /etc/fluence/fluence_braket_intercept.py +# PYTHONSTARTUP is set to this path by the webhook so any Python version loads it. +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluence-braket-interceptor + namespace: kube-system + labels: + app: fluence +data: + fluence_braket_intercept.py: | + # Injected by the Fluence webhook into every pod requesting a QPU resource. + # Patches AwsDevice.run() to automatically tag every quantum task submission + # with the pod UID, enabling the fluence-sidecar to find the task without + # any user application changes. + import os + + def _install_interceptor(): + try: + from braket.aws import AwsDevice + _original_run = AwsDevice.run + + def _patched_run(self, task_specification, *args, **kwargs): + pod_uid = os.environ.get("FLUENCE_POD_UID", "") + if pod_uid: + tags = kwargs.get("tags", {}) + tags["fluence-pod-uid"] = pod_uid + kwargs["tags"] = tags + return _original_run(self, task_specification, *args, **kwargs) + + AwsDevice.run = _patched_run + except ImportError: + pass + + _install_interceptor() \ No newline at end of file diff --git a/deploy/fluence.yaml b/deploy/fluence.yaml index 0b66246..5215a59 100644 --- a/deploy/fluence.yaml +++ b/deploy/fluence.yaml @@ -58,7 +58,7 @@ metadata: rules: - apiGroups: ["scheduling.k8s.io"] resources: ["podgroups", "workloads", "podgroups/status", "workloads/status"] - verbs: ["get", "list", "watch", "update", "patch"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "update", "list", "watch"] @@ -72,6 +72,17 @@ rules: - apiGroups: ["admissionregistration.k8s.io"] resources: ["mutatingwebhookconfigurations"] verbs: ["get", "list", "watch", "patch"] + # The webhook creates per-namespace sidecar RBAC on demand when a leader + # pod is admitted, so users do not need to apply RBAC manually. + - apiGroups: [""] + resources: ["serviceaccounts"] + verbs: ["get", "create"] + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "create"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles", "rolebindings"] + verbs: ["get", "create"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -218,3 +229,79 @@ webhooks: - key: kubernetes.io/metadata.name operator: NotIn values: ["kube-system"] +# fluence-sidecar.yaml +# +# RBAC and supporting resources for the Fluence quantum sidecar. +# +# The sidecar runs inside a leader pod and needs: +# - patch/annotate on pods in its own namespace (to ungate workers and +# propagate the task ARN annotation) +# +# The sidecar ServiceAccount is namespace-scoped — it only has permissions +# in the namespace where the workflow runs. The webhook sets +# spec.serviceAccountName on the leader pod to fluence-sidecar. +# +# The SDK interceptor ConfigMap holds fluence_braket_intercept.py which +# the webhook mounts into user containers as a Python sitecustomize hook, +# transparently tagging every device.run() call with the pod UID. +# +# Apply with: +# kubectl apply -f deploy/fluence-sidecar.yaml + + +--- +# PriorityClass for classical pods paired with quantum work. +# Applied to worker pods by the webhook when they are gated. +# When ungated, high priority triggers preemption of lower-priority work +# so workers get nodes immediately as the QPU result arrives. +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: fluence-quantum-classical + labels: + app: fluence +value: 1000000 +globalDefault: false +preemptionPolicy: PreemptLowerPriority +description: "High priority for classical pods paired with quantum work. Set by Fluence webhook." +--- +# SDK interceptor ConfigMap — holds the Python sitecustomize hook that +# patches AwsDevice.run() to tag every quantum task with the pod UID. +# The webhook mounts this into user containers at Python's site-packages +# path so it runs automatically before any user code. +# +# Mounted at: /etc/fluence/fluence_braket_intercept.py +# PYTHONSTARTUP is set to this path by the webhook so any Python version loads it. +apiVersion: v1 +kind: ConfigMap +metadata: + name: fluence-braket-interceptor + namespace: kube-system + labels: + app: fluence +data: + fluence_braket_intercept.py: | + # Injected by the Fluence webhook into every pod requesting a QPU resource. + # Patches AwsDevice.run() to automatically tag every quantum task submission + # with the pod UID, enabling the fluence-sidecar to find the task without + # any user application changes. + import os + + def _install_interceptor(): + try: + from braket.aws import AwsDevice + _original_run = AwsDevice.run + + def _patched_run(self, task_specification, *args, **kwargs): + pod_uid = os.environ.get("FLUENCE_POD_UID", "") + if pod_uid: + tags = kwargs.get("tags", {}) + tags["fluence-pod-uid"] = pod_uid + kwargs["tags"] = tags + return _original_run(self, task_specification, *args, **kwargs) + + AwsDevice.run = _patched_run + except ImportError: + pass + + _install_interceptor() \ No newline at end of file diff --git a/examples/test/e2e/sidecar-mock-pods.yaml b/examples/test/e2e/sidecar-mock-pods.yaml new file mode 100644 index 0000000..13232f7 --- /dev/null +++ b/examples/test/e2e/sidecar-mock-pods.yaml @@ -0,0 +1,62 @@ +--- +# Leader pod — first admitted, webhook creates PodGroup, injects sidecar, creates RBAC +# User only needs schedulerName: fluence and the quantum-group label. +# No PodGroup object needed — Fluence creates it. +apiVersion: v1 +kind: Pod +metadata: + name: sidecar-test-leader + labels: + app: fluence-sidecar-test + fluence.flux-framework.org/group: sidecar-test-group +spec: + schedulerName: fluence + restartPolicy: Never + containers: + - name: mock-quantum-app + image: busybox + command: + - sh + - -c + - | + echo "mock-quantum-app: running" + echo "arn:aws:braket:us-east-1:123456:quantum-task/mock-abc123" \ + > /tmp/task-arn + echo "mock-quantum-app: task ARN written" + sleep 3600 + resources: + requests: + cpu: "100m" + memory: "128Mi" + +--- +# Worker pod — webhook adds scheduling gate automatically +apiVersion: v1 +kind: Pod +metadata: + name: sidecar-test-worker + labels: + app: fluence-sidecar-test + fluence.flux-framework.org/group: sidecar-test-group +spec: + schedulerName: fluence + restartPolicy: Never + containers: + - name: classical-worker + image: busybox + command: + - sh + - -c + - | + echo "classical-worker: started" + echo "TASK_ARN=$BRAKET_TASK_ARN" + sleep 10 + env: + - name: BRAKET_TASK_ARN + valueFrom: + fieldRef: + fieldPath: metadata.annotations['braket.quantum/task-arn'] + resources: + requests: + cpu: "100m" + memory: "128Mi" diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 17a7b48..59715c6 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -1,17 +1,21 @@ -// Package webhook is fluence's mutating admission webhook. Its job is to make -// scheduler-chosen values reach a pod's containers without the user wiring -// anything. Container env is immutable after a pod is created, so the scheduler -// cannot write it directly; instead this webhook injects, at pod-creation time, -// a downward-API env that reads an annotation the scheduler fills in later -// (during PreBind). The user writes a plain pod; the plumbing is automatic. +// Package webhook is fluence's mutating admission webhook. // -// Current rule: for a pod scheduled by fluence whose container requests a -// fluxion.flux-framework.org/* resource, inject QRMI_BACKEND sourced from the -// fluence backend annotation. New mutation rules can be added in Mutate. +// Rules: // -// The webhook also manages its own TLS: it generates a self-signed CA + serving -// certificate at startup and patches its MutatingWebhookConfiguration's caBundle, -// so the install needs no cert-manager and no committed keys. +// 1. For a pod scheduled by fluence whose container requests a +// fluxion.flux-framework.org/* resource, inject FLUXION_* env vars +// sourced from annotations the scheduler writes in PreBind. +// +// 2. Quantum leader/worker split: +// Pods with label fluence.flux-framework.org/group= and +// schedulerName=fluence trigger the split. The first pod admitted +// becomes the leader — Fluence creates a PodGroup (minCount:1), +// injects the sidecar, creates per-namespace RBAC, and records the +// leader on the PodGroup. Every subsequent pod in the same group +// gets a quantum.braket/ready scheduling gate added. +// +// The webhook self-manages TLS via a self-signed CA patched into the +// MutatingWebhookConfiguration caBundle at startup. package webhook import ( @@ -35,37 +39,105 @@ import ( admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + schedulingv1alpha2 "k8s.io/api/scheduling/v1alpha2" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ) -// SchedulerName is the scheduler whose pods this webhook mutates. -const SchedulerName = "fluence" +// ── Constants ────────────────────────────────────────────────────────────────── + +const ( + SchedulerName = "fluence" + QuantumGroupLabel = "fluence.flux-framework.org/group" + QuantumLeaderAnnotation = "fluence.flux-framework.org/quantum-leader" + QuantumGateName = "quantum.braket/ready" + QuantumClassicalPriorityClass = "fluence-quantum-classical" + SidecarImage = "ghcr.io/converged-computing/fluence-sidecar-braket:latest" + SidecarServiceAccount = "fluence-sidecar" + InterceptorConfigMap = "fluence-braket-interceptor" + InterceptorVolumeName = "fluence-braket-interceptor" + InterceptorMountPath = "/etc/fluence/fluence_braket_intercept.py" +) + +// ── Types ────────────────────────────────────────────────────────────────────── -// jsonPatchOp is a single RFC 6902 JSON Patch operation. type jsonPatchOp struct { Op string `json:"op"` Path string `json:"path"` Value any `json:"value,omitempty"` } -// Mutator injects fluence's scheduler-chosen values into a pod's containers. It -// carries the env contract — the union of attribute keys across the configured -// backends — so it injects a stable, predictable set of environment variables -// regardless of which backend a given pod ends up matching. Values flow via the -// downward API from annotations the scheduler writes in PreBind, so the env var -// NAMES are fixed at pod-creation time (here) while their VALUES populate later. type Mutator struct { - // AttributeKeys is the union of user attribute keys across all backends. Each - // becomes a FLUXION_ env var sourced from its attr- annotation. AttributeKeys []string + Client kubernetes.Interface + SidecarImage string +} + +// ── Helpers ──────────────────────────────────────────────────────────────────── + +func (m *Mutator) sidecarImage() string { + if m.SidecarImage != "" { + return m.SidecarImage + } + return SidecarImage +} + +// groupName returns the value of QuantumGroupLabel on the pod, or "". +func groupName(pod *corev1.Pod) string { + if pod.Labels == nil { + return "" + } + return pod.Labels[QuantumGroupLabel] +} + +func annotationEnv(envName, annotationKey string) corev1.EnvVar { + return corev1.EnvVar{ + Name: envName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", annotationKey), + }, + }, + } +} + +func fieldEnv(envName, fieldPath string) corev1.EnvVar { + return corev1.EnvVar{ + Name: envName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: fieldPath}, + }, + } +} + +func requestsFluxionResource(c corev1.Container) bool { + for name := range c.Resources.Requests { + if strings.HasPrefix(string(name), placement.FluxionResourcePrefix) { + return true + } + } + return false } -// injectedEnv returns the full normalized env set this mutator injects into a -// fluxion-requesting container: FLUXION_BACKEND plus one FLUXION_ per -// configured attribute key. Each reads its annotation via the downward API; an -// annotation the scheduler did not set resolves to empty, which is harmless. +func hasEnv(c corev1.Container, name string) bool { + for _, e := range c.Env { + if e.Name == name { + return true + } + } + return false +} + +func resourceQuantity(s string) *resource.Quantity { + q := resource.MustParse(s) + return &q +} + +// ── Env contract ─────────────────────────────────────────────────────────────── + func (m *Mutator) injectedEnv() []corev1.EnvVar { envs := []corev1.EnvVar{annotationEnv( placement.EnvVarPrefix+"BACKEND", placement.BackendAnnotation)} @@ -76,9 +148,6 @@ func (m *Mutator) injectedEnv() []corev1.EnvVar { return envs } -// EnvVarNames returns the names of every env var this mutator injects, for -// startup logging so the developer sees the exact contract their container can -// rely on. func (m *Mutator) EnvVarNames() []string { names := make([]string, 0, len(m.AttributeKeys)+1) for _, e := range m.injectedEnv() { @@ -87,27 +156,261 @@ func (m *Mutator) EnvVarNames() []string { return names } -// annotationEnv builds a downward-API env var that reads a pod annotation. -func annotationEnv(envName, annotationKey string) corev1.EnvVar { - return corev1.EnvVar{ - Name: envName, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: fmt.Sprintf("metadata.annotations['%s']", annotationKey), +// ── PodGroup management ──────────────────────────────────────────────────────── + +func (m *Mutator) podGroupLeader(ctx context.Context, pod *corev1.Pod) string { + if m.Client == nil { + return "" + } + g := groupName(pod) + if g == "" { + return "" + } + // Retry briefly — the leader pod may have just created the PodGroup and + // is recording itself; the worker pod admission may fire concurrently. + for i := 0; i < 3; i++ { + pg, err := m.Client.SchedulingV1alpha2().PodGroups(pod.Namespace).Get( + ctx, g, metav1.GetOptions{}) + if err != nil { + return "" + } + if pg.Annotations != nil && pg.Annotations[QuantumLeaderAnnotation] != "" { + return pg.Annotations[QuantumLeaderAnnotation] + } + if i < 2 { + time.Sleep(100 * time.Millisecond) + } + } + return "" +} + +func (m *Mutator) ensureQuantumPodGroup(ctx context.Context, pod *corev1.Pod, g string) { + if m.Client == nil { + return + } + if _, err := m.Client.SchedulingV1alpha2().PodGroups(pod.Namespace).Get( + ctx, g, metav1.GetOptions{}); err == nil { + return + } + pg := &schedulingv1alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: g, + Namespace: pod.Namespace, + Labels: map[string]string{"app": "fluence", QuantumGroupLabel: g}, + }, + Spec: schedulingv1alpha2.PodGroupSpec{ + SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{ + Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 1}, + }, + }, + } + if _, err := m.Client.SchedulingV1alpha2().PodGroups(pod.Namespace).Create( + ctx, pg, metav1.CreateOptions{}); err != nil { + log.Printf("[fluence-webhook] could not create PodGroup %s/%s: %v", pod.Namespace, g, err) + } else { + log.Printf("[fluence-webhook] created PodGroup %s/%s (minCount=1)", pod.Namespace, g) + } +} + +func (m *Mutator) recordLeader(ctx context.Context, pod *corev1.Pod) { + if m.Client == nil { + return + } + g := groupName(pod) + if g == "" { + return + } + patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, QuantumLeaderAnnotation, pod.Name) + if _, err := m.Client.SchedulingV1alpha2().PodGroups(pod.Namespace).Patch( + ctx, g, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { + log.Printf("[fluence-webhook] could not record leader on PodGroup %s/%s: %v", pod.Namespace, g, err) + } +} + +// ── Per-namespace resource provisioning ─────────────────────────────────────── + +func (m *Mutator) ensureSidecarRBAC(ctx context.Context, namespace string) { + if m.Client == nil { + return + } + + if _, err := m.Client.CoreV1().ServiceAccounts(namespace).Get( + ctx, SidecarServiceAccount, metav1.GetOptions{}); err != nil { + sa := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{Name: SidecarServiceAccount, Namespace: namespace, + Labels: map[string]string{"app": "fluence-sidecar"}}, + } + if _, err := m.Client.CoreV1().ServiceAccounts(namespace).Create(ctx, sa, metav1.CreateOptions{}); err != nil { + log.Printf("[fluence-webhook] could not create ServiceAccount %s/%s: %v", namespace, SidecarServiceAccount, err) + } else { + log.Printf("[fluence-webhook] created ServiceAccount %s/%s", namespace, SidecarServiceAccount) + } + } + + if _, err := m.Client.RbacV1().Roles(namespace).Get( + ctx, SidecarServiceAccount, metav1.GetOptions{}); err != nil { + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{Name: SidecarServiceAccount, Namespace: namespace, + Labels: map[string]string{"app": "fluence-sidecar"}}, + Rules: []rbacv1.PolicyRule{ + {APIGroups: []string{""}, Resources: []string{"pods"}, Verbs: []string{"get", "list", "patch", "update"}}, + {APIGroups: []string{"scheduling.k8s.io"}, Resources: []string{"podgroups"}, Verbs: []string{"get", "list"}}, + }, + } + if _, err := m.Client.RbacV1().Roles(namespace).Create(ctx, role, metav1.CreateOptions{}); err != nil { + log.Printf("[fluence-webhook] could not create Role %s/%s: %v", namespace, SidecarServiceAccount, err) + } else { + log.Printf("[fluence-webhook] created Role %s/%s", namespace, SidecarServiceAccount) + } + } + + if _, err := m.Client.RbacV1().RoleBindings(namespace).Get( + ctx, SidecarServiceAccount, metav1.GetOptions{}); err != nil { + rb := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{Name: SidecarServiceAccount, Namespace: namespace, + Labels: map[string]string{"app": "fluence-sidecar"}}, + Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: SidecarServiceAccount, Namespace: namespace}}, + RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "Role", Name: SidecarServiceAccount}, + } + if _, err := m.Client.RbacV1().RoleBindings(namespace).Create(ctx, rb, metav1.CreateOptions{}); err != nil { + log.Printf("[fluence-webhook] could not create RoleBinding %s/%s: %v", namespace, SidecarServiceAccount, err) + } else { + log.Printf("[fluence-webhook] created RoleBinding %s/%s", namespace, SidecarServiceAccount) + } + } + + // Copy interceptor ConfigMap from kube-system into the pod namespace + if _, err := m.Client.CoreV1().ConfigMaps(namespace).Get( + ctx, InterceptorConfigMap, metav1.GetOptions{}); err != nil { + if src, srcErr := m.Client.CoreV1().ConfigMaps("kube-system").Get( + ctx, InterceptorConfigMap, metav1.GetOptions{}); srcErr != nil { + log.Printf("[fluence-webhook] could not read interceptor ConfigMap from kube-system: %v", srcErr) + } else { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: InterceptorConfigMap, Namespace: namespace, + Labels: map[string]string{"app": "fluence-sidecar"}}, + Data: src.Data, + } + if _, err := m.Client.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil { + log.Printf("[fluence-webhook] could not create interceptor ConfigMap %s/%s: %v", namespace, InterceptorConfigMap, err) + } else { + log.Printf("[fluence-webhook] created interceptor ConfigMap %s/%s", namespace, InterceptorConfigMap) + } + } + } +} + +// ── Patch operation builders ─────────────────────────────────────────────────── + +func quantumWorkerGateOps(pod *corev1.Pod) []jsonPatchOp { + for _, g := range pod.Spec.SchedulingGates { + if g.Name == QuantumGateName { + return nil + } + } + gate := corev1.PodSchedulingGate{Name: QuantumGateName} + if len(pod.Spec.SchedulingGates) == 0 { + return []jsonPatchOp{{Op: "add", Path: "/spec/schedulingGates", Value: []corev1.PodSchedulingGate{gate}}} + } + return []jsonPatchOp{{Op: "add", Path: "/spec/schedulingGates/-", Value: gate}} +} + +func (m *Mutator) sidecarOps(pod *corev1.Pod) []jsonPatchOp { + var ops []jsonPatchOp + + sidecar := corev1.Container{ + Name: "fluence-sidecar", + Image: m.sidecarImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + fieldEnv("FLUENCE_POD_UID", "metadata.uid"), + fieldEnv("FLUENCE_POD_NAME", "metadata.name"), + fieldEnv("FLUENCE_NAMESPACE", "metadata.namespace"), + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resourceQuantity("100m"), + corev1.ResourceMemory: *resourceQuantity("256Mi"), + }, + }, + } + if len(pod.Spec.Containers) == 0 { + ops = append(ops, jsonPatchOp{Op: "add", Path: "/spec/containers", Value: []corev1.Container{sidecar}}) + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: "/spec/containers/-", Value: sidecar}) + } + + vol := corev1.Volume{ + Name: InterceptorVolumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: InterceptorConfigMap}, }, }, } + if len(pod.Spec.Volumes) == 0 { + ops = append(ops, jsonPatchOp{Op: "add", Path: "/spec/volumes", Value: []corev1.Volume{vol}}) + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: "/spec/volumes/-", Value: vol}) + } + + mount := corev1.VolumeMount{Name: InterceptorVolumeName, MountPath: InterceptorMountPath, + SubPath: "fluence_braket_intercept.py", ReadOnly: true} + startup := corev1.EnvVar{Name: "PYTHONSTARTUP", Value: InterceptorMountPath} + for i, c := range pod.Spec.Containers { + if !requestsFluxionResource(c) { + continue + } + if len(c.VolumeMounts) == 0 { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/volumeMounts", i), Value: []corev1.VolumeMount{mount}}) + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/volumeMounts/-", i), Value: mount}) + } + if !hasEnv(c, "PYTHONSTARTUP") { + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env", i), Value: []corev1.EnvVar{startup}}) + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: startup}) + } + } + } + + if pod.Spec.ServiceAccountName == "" || pod.Spec.ServiceAccountName == "default" { + ops = append(ops, jsonPatchOp{Op: "add", Path: "/spec/serviceAccountName", Value: SidecarServiceAccount}) + } + + return ops +} + +func podUIDOps(pod *corev1.Pod) []jsonPatchOp { + uid := fieldEnv("FLUENCE_POD_UID", "metadata.uid") + var ops []jsonPatchOp + for i, c := range pod.Spec.Containers { + if !requestsFluxionResource(c) || hasEnv(c, "FLUENCE_POD_UID") { + continue + } + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env", i), Value: []corev1.EnvVar{uid}}) + pod.Spec.Containers[i].Env = []corev1.EnvVar{uid} + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: uid}) + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, uid) + } + } + return ops } -// Mutate returns the JSON Patch operations for a pod, or nil if nothing applies. -// For each container that requests a fluxion.flux-framework.org/* resource, it -// appends every contract env var the container does not already define. -func (m *Mutator) Mutate(pod *corev1.Pod) []jsonPatchOp { +// ── Mutate ───────────────────────────────────────────────────────────────────── + +func (m *Mutator) Mutate(ctx context.Context, pod *corev1.Pod) []jsonPatchOp { if pod.Spec.SchedulerName != SchedulerName { return nil } - contract := m.injectedEnv() + var ops []jsonPatchOp + + // Rule 1: inject FLUXION_* env contract + contract := m.injectedEnv() for i, c := range pod.Spec.Containers { if !requestsFluxionResource(c) { continue @@ -117,46 +420,39 @@ func (m *Mutator) Mutate(pod *corev1.Pod) []jsonPatchOp { continue } if len(c.Env) == 0 { - ops = append(ops, jsonPatchOp{ - Op: "add", - Path: fmt.Sprintf("/spec/containers/%d/env", i), - Value: []corev1.EnvVar{e}, - }) - // Subsequent vars append to the now-existing slice. - c.Env = []corev1.EnvVar{e} - continue + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env", i), Value: []corev1.EnvVar{e}}) + pod.Spec.Containers[i].Env = []corev1.EnvVar{e} + } else { + ops = append(ops, jsonPatchOp{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: e}) + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, e) } - ops = append(ops, jsonPatchOp{ - Op: "add", - Path: fmt.Sprintf("/spec/containers/%d/env/-", i), - Value: e, - }) - c.Env = append(c.Env, e) } } - return ops -} -func requestsFluxionResource(c corev1.Container) bool { - for name := range c.Resources.Requests { - if strings.HasPrefix(string(name), placement.FluxionResourcePrefix) { - return true - } + // Rule 2: quantum leader/worker split + g := groupName(pod) + if g == "" { + return ops } - return false -} -func hasEnv(c corev1.Container, name string) bool { - for _, e := range c.Env { - if e.Name == name { - return true - } + leader := m.podGroupLeader(ctx, pod) + if leader == "" { + log.Printf("[fluence-webhook] pod %s/%s is quantum leader for group %s", pod.Namespace, pod.Name, g) + m.ensureQuantumPodGroup(ctx, pod, g) + m.ensureSidecarRBAC(ctx, pod.Namespace) + m.recordLeader(ctx, pod) + ops = append(ops, m.sidecarOps(pod)...) + ops = append(ops, podUIDOps(pod)...) + } else { + log.Printf("[fluence-webhook] pod %s/%s is quantum worker (leader=%s)", pod.Namespace, pod.Name, leader) + ops = append(ops, quantumWorkerGateOps(pod)...) } - return false + + return ops } -// Handler is the /mutate endpoint. It always admits the pod (failure to mutate -// must not block creation); it only adds a patch when Mutate returns one. +// ── HTTP handler ─────────────────────────────────────────────────────────────── + func (m *Mutator) Handler(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { @@ -168,40 +464,34 @@ func (m *Mutator) Handler(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad admission review", http.StatusBadRequest) return } - resp := &admissionv1.AdmissionResponse{UID: review.Request.UID, Allowed: true} var pod corev1.Pod if err := json.Unmarshal(review.Request.Object.Raw, &pod); err == nil { - if ops := m.Mutate(&pod); len(ops) > 0 { + if ops := m.Mutate(r.Context(), &pod); len(ops) > 0 { if patch, err := json.Marshal(ops); err == nil { pt := admissionv1.PatchTypeJSONPatch resp.Patch = patch resp.PatchType = &pt - log.Printf("[fluence-webhook] injected %d env op(s) into pod %s/%s", - len(ops), pod.Namespace, pod.Name) + log.Printf("[fluence-webhook] injected %d op(s) into pod %s/%s", len(ops), pod.Namespace, pod.Name) } } } - out := admissionv1.AdmissionReview{TypeMeta: review.TypeMeta, Response: resp} w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(out) } -// GenerateCerts returns a self-signed CA (PEM) and a serving cert+key (PEM) valid -// for the given DNS names. The CA PEM is what the apiserver must trust (caBundle). +// ── TLS ──────────────────────────────────────────────────────────────────────── + func GenerateCerts(dnsNames []string) (caPEM, certPEM, keyPEM []byte, err error) { caKey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { return nil, nil, nil, err } caTmpl := &x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{CommonName: "fluence-webhook-ca"}, - NotBefore: time.Now().Add(-time.Hour), - NotAfter: time.Now().AddDate(10, 0, 0), - IsCA: true, - KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + SerialNumber: big.NewInt(1), Subject: pkix.Name{CommonName: "fluence-webhook-ca"}, + NotBefore: time.Now().Add(-time.Hour), NotAfter: time.Now().AddDate(10, 0, 0), + IsCA: true, KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, BasicConstraintsValid: true, } caDER, err := x509.CreateCertificate(rand.Reader, caTmpl, caTmpl, &caKey.PublicKey, caKey) @@ -212,38 +502,29 @@ func GenerateCerts(dnsNames []string) (caPEM, certPEM, keyPEM []byte, err error) if err != nil { return nil, nil, nil, err } - leafKey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { return nil, nil, nil, err } leafTmpl := &x509.Certificate{ - SerialNumber: big.NewInt(2), - Subject: pkix.Name{CommonName: dnsNames[0]}, - NotBefore: time.Now().Add(-time.Hour), - NotAfter: time.Now().AddDate(10, 0, 0), - KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, - DNSNames: dnsNames, + SerialNumber: big.NewInt(2), Subject: pkix.Name{CommonName: dnsNames[0]}, + NotBefore: time.Now().Add(-time.Hour), NotAfter: time.Now().AddDate(10, 0, 0), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, DNSNames: dnsNames, } leafDER, err := x509.CreateCertificate(rand.Reader, leafTmpl, caCert, &leafKey.PublicKey, caKey) if err != nil { return nil, nil, nil, err } - caPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDER}) certPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: leafDER}) keyPEM = pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(leafKey)}) return caPEM, certPEM, keyPEM, nil } -// EnsureCABundle patches the named MutatingWebhookConfiguration so its first -// webhook trusts caPEM. func EnsureCABundle(ctx context.Context, client kubernetes.Interface, configName string, caPEM []byte) error { - patch := fmt.Sprintf( - `[{"op":"replace","path":"/webhooks/0/clientConfig/caBundle","value":%q}]`, - base64.StdEncoding.EncodeToString(caPEM), - ) + patch := fmt.Sprintf(`[{"op":"replace","path":"/webhooks/0/clientConfig/caBundle","value":%q}]`, + base64.StdEncoding.EncodeToString(caPEM)) _, err := client.AdmissionregistrationV1().MutatingWebhookConfigurations().Patch( ctx, configName, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) return err diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index 6d97e40..05496f0 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -1,6 +1,7 @@ package webhook import ( + "context" "testing" "github.com/converged-computing/fluence/pkg/placement" @@ -8,7 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) -func qpuPod(scheduler string, presetEnv string) *corev1.Pod { +func qpuPod(scheduler, presetEnv string) *corev1.Pod { c := corev1.Container{ Name: "app", Resources: corev1.ResourceRequirements{ @@ -23,7 +24,20 @@ func qpuPod(scheduler string, presetEnv string) *corev1.Pod { return &corev1.Pod{Spec: corev1.PodSpec{SchedulerName: scheduler, Containers: []corev1.Container{c}}} } -// envNames returns the env var names referenced by a list of add-ops. +func cpuPod(scheduler string) *corev1.Pod { + return &corev1.Pod{Spec: corev1.PodSpec{ + SchedulerName: scheduler, + Containers: []corev1.Container{{ + Name: "c", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + }}, + }} +} + func opEnvNames(ops []jsonPatchOp) []string { var names []string for _, op := range ops { @@ -48,45 +62,71 @@ func contains(names []string, want string) bool { return false } -// With a config-derived contract (region, qubits), a fluxion pod gets -// FLUXION_BACKEND plus one FLUXION_ per attribute key. +func hasGateOp(ops []jsonPatchOp) bool { + for _, op := range ops { + switch v := op.Value.(type) { + case corev1.PodSchedulingGate: + if v.Name == QuantumGateName { + return true + } + case []corev1.PodSchedulingGate: + for _, g := range v { + if g.Name == QuantumGateName { + return true + } + } + } + } + return false +} + +func hasSidecarOp(ops []jsonPatchOp) bool { + for _, op := range ops { + switch v := op.Value.(type) { + case corev1.Container: + if v.Name == "fluence-sidecar" { + return true + } + case []corev1.Container: + for _, c := range v { + if c.Name == "fluence-sidecar" { + return true + } + } + } + } + return false +} + func TestMutateInjectsContract(t *testing.T) { m := &Mutator{AttributeKeys: []string{"region", "qubits"}} - ops := m.Mutate(qpuPod("fluence", "")) + ops := m.Mutate(context.Background(), qpuPod("fluence", "")) names := opEnvNames(ops) - for _, want := range []string{"FLUXION_BACKEND", "FLUXION_REGION", "FLUXION_QUBITS"} { if !contains(names, want) { t.Errorf("missing injected env %q; got %v", want, names) } } - if len(names) != 3 { - t.Errorf("expected exactly 3 env vars, got %v", names) - } } -// With no configured attributes, only FLUXION_BACKEND is injected. func TestMutateBackendOnly(t *testing.T) { m := &Mutator{} - names := opEnvNames(m.Mutate(qpuPod("fluence", ""))) + names := opEnvNames(m.Mutate(context.Background(), qpuPod("fluence", ""))) if len(names) != 1 || names[0] != "FLUXION_BACKEND" { t.Fatalf("want [FLUXION_BACKEND], got %v", names) } } -// Non-fluence pods are never mutated. func TestMutateSkipsOtherScheduler(t *testing.T) { m := &Mutator{AttributeKeys: []string{"region"}} - if ops := m.Mutate(qpuPod("default-scheduler", "")); ops != nil { + if ops := m.Mutate(context.Background(), qpuPod("default-scheduler", "")); ops != nil { t.Fatalf("non-fluence pod should not be mutated, got %v", ops) } } -// An env var the container already defines is not re-injected (idempotent / no -// override), while the others still are. func TestMutateRespectsExistingEnv(t *testing.T) { m := &Mutator{AttributeKeys: []string{"region"}} - names := opEnvNames(m.Mutate(qpuPod("fluence", "FLUXION_BACKEND"))) + names := opEnvNames(m.Mutate(context.Background(), qpuPod("fluence", "FLUXION_BACKEND"))) if contains(names, "FLUXION_BACKEND") { t.Errorf("should not re-inject existing FLUXION_BACKEND; got %v", names) } @@ -95,19 +135,13 @@ func TestMutateRespectsExistingEnv(t *testing.T) { } } -// Classical pods (no fluxion resource request) are not mutated. func TestMutateSkipsNonFluxion(t *testing.T) { m := &Mutator{AttributeKeys: []string{"region"}} - p := &corev1.Pod{Spec: corev1.PodSpec{ - SchedulerName: "fluence", - Containers: []corev1.Container{{Name: "c", Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI)}}}}, - }} - if ops := m.Mutate(p); ops != nil { + if ops := m.Mutate(context.Background(), cpuPod("fluence")); ops != nil { t.Fatalf("classical pod should not be mutated, got %v", ops) } } -// EnvVarNames reports the full contract for startup logging. func TestEnvVarNames(t *testing.T) { m := &Mutator{AttributeKeys: []string{"region", "connectivity"}} names := m.EnvVarNames() @@ -115,3 +149,46 @@ func TestEnvVarNames(t *testing.T) { t.Fatalf("EnvVarNames = %v, want FLUXION_BACKEND first then attrs", names) } } + +// A QPU pod with no group label gets no gate and no sidecar. +func TestMutateQPUSinglePodNoSidecar(t *testing.T) { + m := &Mutator{} + ops := m.Mutate(context.Background(), qpuPod("fluence", "")) + if hasGateOp(ops) { + t.Error("single QPU pod should not get a scheduling gate") + } + if hasSidecarOp(ops) { + t.Error("single QPU pod should not get a sidecar injected") + } +} + +// quantumWorkerGateOps adds the gate to a pod with no existing gates. +func TestQuantumWorkerGateOpsEmpty(t *testing.T) { + pod := qpuPod("fluence", "") + ops := quantumWorkerGateOps(pod) + if !hasGateOp(ops) { + t.Errorf("expected gate op, got %v", ops) + } +} + +// quantumWorkerGateOps is idempotent. +func TestQuantumWorkerGateOpsIdempotent(t *testing.T) { + pod := qpuPod("fluence", "") + pod.Spec.SchedulingGates = []corev1.PodSchedulingGate{{Name: QuantumGateName}} + ops := quantumWorkerGateOps(pod) + if len(ops) != 0 { + t.Errorf("expected no ops when gate already present, got %v", ops) + } +} + +// groupName returns the quantum group label value. +func TestGroupName(t *testing.T) { + pod := qpuPod("fluence", "") + if groupName(pod) != "" { + t.Error("pod without group label should return empty") + } + pod.Labels = map[string]string{QuantumGroupLabel: "my-workflow"} + if groupName(pod) != "my-workflow" { + t.Errorf("expected my-workflow, got %q", groupName(pod)) + } +} diff --git a/sidecars/README.md b/sidecars/README.md new file mode 100644 index 0000000..39fa580 --- /dev/null +++ b/sidecars/README.md @@ -0,0 +1,39 @@ +# Fluence Sidecars + +Each subdirectory contains a sidecar for a specific quantum cloud vendor or +SDK. Sidecars are injected automatically by the Fluence mutating webhook into +any pod requesting a QPU resource, based on the `qrmi_type` attribute of the +matched backend. + +## How sidecars work + +When Fluence schedules a pod requesting `fluxion.flux-framework.org/qpu`, the +webhook: + +1. Identifies the matched backend's `qrmi_type` (e.g. `braket-gate`, `braket-ahs`, `qrmi`) +2. Injects the corresponding sidecar container into the pod +3. Injects the SDK interceptor as a Python sitecustomize hook +4. Injects `FLUENCE_POD_UID`, `FLUENCE_GATED_PODS`, and other coordination env vars + +The sidecar runs alongside the user's quantum application, discovers the +submitted task using the injected pod UID tag, polls the vendor queue, and +ungates paired classical pods when the quantum task is one position from +executing. + +## Available sidecars + +| Directory | Vendor | qrmi_type | Status | +|---|---|---|---| +| `braket/` | AWS Braket (gate + AHS) | `braket-gate`, `braket-ahs` | Active | +| `qrmi/` | QRMI-compatible backends | `qrmi` | Planned | + +## Adding a new sidecar + +1. Create a new subdirectory: `sidecars//` +2. Implement `sidecar.py` — must discover the task ARN and call the shared + ungating logic in `sidecars/lib/ungate.py` +3. Implement `_intercept.py` — patches the vendor SDK's submit method + to tag tasks with `FLUENCE_POD_UID` +4. Add a `Dockerfile` +5. Add the image to `.github/workflows/sidecar-build-deploy.yaml` +6. Add an e2e mock test following `test/e2e/04-sidecar-ungate.sh` diff --git a/sidecars/braket/Dockerfile b/sidecars/braket/Dockerfile new file mode 100644 index 0000000..c04f8ad --- /dev/null +++ b/sidecars/braket/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.11-slim + +LABEL org.opencontainers.image.source="https://github.com/converged-computing/fluence" +LABEL org.opencontainers.image.description="Fluence AWS Braket sidecar — quantum-classical scheduling coordination" + +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +RUN curl -LO "https://dl.k8s.io/release/$(curl -Ls https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" \ + && chmod +x kubectl && mv kubectl /usr/local/bin/ + +WORKDIR /app + +RUN pip install --no-cache-dir \ + amazon-braket-sdk==1.88.0 \ + boto3 + +# Copy shared lib first, then vendor-specific files +# Build context is sidecars/ so paths are relative to that +COPY sidecars/lib/ungate.py ./lib/ungate.py +COPY sidecars/braket/sidecar.py . +COPY sidecars/braket/fluence_braket_intercept.py . + +ENV FLUENCE_TASK_DISCOVERY_TIMEOUT=300 +ENV FLUENCE_POLL_INTERVAL=30 + +CMD ["python", "sidecar.py"] diff --git a/sidecars/braket/design.md b/sidecars/braket/design.md new file mode 100644 index 0000000..4a67548 --- /dev/null +++ b/sidecars/braket/design.md @@ -0,0 +1,295 @@ +# Quantum-Classical Scheduling Coordination in Fluence + +## Abstract + +Hybrid quantum-classical workflows submit work to two independent queues: +the Kubernetes scheduler (classical compute) and a QPU vendor API (quantum +execution). Classical pods waste node resources while waiting for QPU queue +results. Fluence's coordination system thus gates classical worker pods until +the QPU task is one position from executing, then releases them with high +priority so they preempt lower-priority work and start immediately as the +QPU result arrives. Yes, it could be the case the one task in the queue before +it takes a long time, but I think this is an improved approach than having worker +pods running (and waiting) for a much longer queue. This only is important +given that you have gangs, or leader worker designs where some leader is launching +the quantum work and otherwise the workers would be waiting and doing nothing +(and wasting resources). + +## 1. The Two-Queue Problem + +When a hybrid quantum-classical job runs on Kubernetes, the classical pod +starts immediately and blocks waiting for the QPU result. The QPU task +enters a vendor-managed queue shared across all users. The classical pod +consumes node resources — CPU, memory, potentially GPU — for the entire +duration of the QPU queue wait, which may be minutes to hours on real +hardware. + +This waste scales with concurrency. With N concurrent hybrid jobs, each pod +idles for the full QPU queue wait. On real QPU backends (IQM Garnet, IQM +Emerald) we measure 15–30% classical idle fraction at N=10, rising to over +70% for individual pods at N=20. Wall time scales linearly with concurrency +on real QPUs — submitting 20 jobs takes 5–8× longer than 1 job due to +self-imposed queue depth. + +## 2. Why Existing Mechanisms Are Insufficient + +### 2.1 Fluxion reservations + +Fluxion's backfill reservation policies (EASY, Conservative, Hybrid) compute +a future `time_at` from the internal resource graph — when currently running +classical jobs will finish. They cannot accept an externally-supplied time +derived from a vendor queue. Without a reliable `time_at`, a reservation +degenerates to a pending job. All reservations are cancelled and recomputed +from scratch at the start of every scheduling loop. + +QPU queue time is unknowable in advance. It depends on other +users' submissions, hardware calibration windows, and network latency. +Average task time per QPU cannot be estimated reliably. Therefore Fluxion +reservations cannot help with the two-queue problem. I learned that we are +working on "advanced reservations" that are more like a hold, but it is +not clear if that can be merged soon. + +### 2.2 Scheduling gates alone + +A scheduling gate holds a pod out of the scheduling queue entirely, consuming +no node resources. But ungating N pods simultaneously on a busy cluster +creates a race — resources may not be available when ungating occurs, and +the Fluxion graph allocation happens after ungating, not before. Without +priority, ungated pods compete equally with all pending work. + +### 2.3 Preemption alone + +Submitting classical pods with a high PriorityClass causes Kubernetes to +evict lower-priority pods immediately at submit time — during the entire QPU +queue wait — which is worse than the original problem. + +## 3. Design + +The design combines four mechanisms: + +1. **SDK interceptor** — tags every QPU task with the pod UID +2. **Fluence webhook** — gates worker pods, injects sidecar into leader +3. **Sidecar controller** — discovers the QPU task, polls queue position, + ungates workers when position==1 +4. **High-priority ungating** — workers preempt lower-priority work at the + last responsible moment + +### 3.1 User interface + +The user labels all pods in a workflow group with: + +```yaml +metadata: + labels: + fluence.flux-framework.org/group: my-workflow +spec: + schedulerName: fluence +``` + +I initially started with having the user create a PodGroup object, and I found +that annoying. I do not want to require a PodGroup object when an annotation is easier, +and then I have fine-grained control of what the groups looks like. Fluence can handle +everything else automatically. + +The namespace distinction: +- `fluence.flux-framework.org/*` — Fluence scheduler-plugin concerns + (group label, leader annotation, gate name) +- `fluxion.flux-framework.org/*` — Fluxion resource-graph concerns + (extended resource types, backend attribute env vars) + +### 3.2 Webhook behavior + +When the Fluence mutating webhook sees a pod with `schedulerName: fluence` +and `fluence.flux-framework.org/group=`: + +**First pod admitted (leader):** +1. Creates a PodGroup with `minCount: 1` — Fluence owns this PodGroup, + the user never creates it. `minCount: 1` means the leader schedules + immediately without waiting for gated workers. The assumption here is + that this leader is going to submit the quantum work. +2. Records the leader pod name on the PodGroup via `QuantumLeaderAnnotation`. +3. Creates per-namespace RBAC: `fluence-sidecar` ServiceAccount, Role + (patch pods, list PodGroups), RoleBinding. +4. Copies `fluence-braket-interceptor` ConfigMap from `kube-system` into + the pod's namespace (ConfigMap volumes require same-namespace source). +5. Injects `fluence-sidecar` container into the leader pod. +6. Injects `FLUENCE_POD_UID` env var (downward API from `metadata.uid`). +7. Mounts the interceptor ConfigMap and sets `PYTHONSTARTUP` env var so + the interceptor runs automatically before user code. +8. Sets `serviceAccountName: fluence-sidecar`. + +**Subsequent pods (workers):** +1. Reads the PodGroup leader annotation — retries up to 3× with 100ms + delay to handle concurrent admission race. +2. Adds `quantum.braket/ready` scheduling gate — pod enters + `SchedulingGated` state, invisible to Fluxion, consuming no resources. + +### 3.3 Braket SDK interceptor + +I created a consistent sidecar that is going to monitor the queue, and be able +to ungate the worker pods when the task submit by our pod is at position 1 +(implicating it will run soon, and we assume the user wants the classical +gang to run at the same time or slightly sooner). Note that it is up to the +user application to orchestrate the leader and workers, and coordination +of the quantum results. A few examples: + +- The worker pods are guaranteed to get an ARN for where the Braket results are in S3, + and this is ensured by the sidecar. So a reasonable approach is for workers query + that bucket looking for a finished marker. This would not require coordination from + the leader. +- Given communication from the leader to workers, the leader can tell them exactly + when the work is finished, and coordinate what they do with results. + +I ran into the issue of needing to GET the task id from the primary pod from +the sidecar. What I decided on is a very simply injection - the call of the +script to submit the job can take arbitrary tags, and so I wrap that with a configmap +that is in the pythonpath, and ensure the task is tagged with a pod specific UID +that the sidecar also knows. More specifically, `fluence_braket_intercept.py` script is +mounted via `PYTHONSTARTUP` into every container in the leader pod. It monkey-patches +`AwsDevice.run()` to automatically tag every quantum task submission with `FLUENCE_POD_UID`: + +```python +def _patched_run(self, task_specification, *args, **kwargs): + pod_uid = os.environ.get("FLUENCE_POD_UID", "") + if pod_uid: + tags = kwargs.get("tags", {}) + tags["fluence-pod-uid"] = pod_uid + kwargs["tags"] = tags + return _original_run(self, task_specification, *args, **kwargs) +``` + +This is completely transparent to the user application. No code changes +are required. + +### 3.4 Sidecar controller + +The `fluence-sidecar` container runs alongside the user application in the +leader pod, sharing its AWS credentials and network namespace. + +```console +1. READ FLUXION_ARN, FLUENCE_POD_UID from env + +2. DISCOVER task by tag: + search_quantum_tasks(filters=[ + deviceArn == FLUXION_ARN, + tags:fluence-pod-uid == FLUENCE_POD_UID + ]) + Poll every 10s, timeout after 300s. + On timeout: fall back to time-window heuristic (tasks submitted + after pod start time on the same device). + +3. DISCOVER worker pods: + List pods in namespace with fluence.flux-framework.org/group label + matching this pod's group, having quantum.braket/ready gate present. + +4. POLL task.queue_position() every 30s. + Log position for experiment instrumentation. + +5. WHEN position == "1" OR state == RUNNING: + For each worker pod: + kubectl annotate pod braket.quantum/task-arn= + kubectl patch pod --type=json \ + -p='[{"op":"add","path":"/spec/priorityClassName", + "value":"fluence-quantum-classical"}, + {"op":"remove","path":"/spec/schedulingGates/0"}]' + +6. EXIT +``` + +The priority class and gate removal are applied atomically in one patch. +This ensures workers enter the scheduling queue with high priority +immediately, without a window where they are ungated but low-priority. + +### 3.5 Priority and preemption + +The `fluence-quantum-classical` PriorityClass (value: 1,000,000) is applied +by the sidecar at ungate time, not by the webhook at pod creation. Setting +it at creation time causes an admission controller conflict (priority integer +already defaulted to 0). + +When workers are ungated with high priority, Kubernetes preemption evicts +lower-priority pods to make room. Fluence's pod deletion informer catches +these evictions, calls `Cancel(jobid)` in Fluxion, and frees the graph +vertices so Fluxion can allocate them to the incoming high-priority workers. + +### 3.6 Classical allocation follows quantum execution order + +Because each workflow's gate is removed independently when its QPU task +reaches position==1, workflows whose QPU tasks execute earlier get classical +resources earlier — regardless of submission order. A workflow submitted to +a quiet backend gets its classical resources before one submitted earlier to +a busy backend. This aligns classical resource allocation with actual quantum +execution order across heterogeneous backends. + +## 4. Properties + +| Property | Value | +|---|---| +| User code changes required | None | +| User manifest changes required | Add group label + schedulerName | +| Classical resources during QPU wait | Zero (SchedulingGated) | +| QPU queue time estimation needed | No — position==1 is observable | +| Works across heterogeneous backends | Yes — any backend in Fluxion graph | +| Vendor API cooperation needed | No — SDK interceptor handles tagging | + +## 5. Limitations + +### 5.1 Preemption disrupts lower-priority work + +At position==1, workers preempt running lower-priority pods. This work is +re-queued and eventually runs, but there is a disruption cost. A future +design using a `MatchReserveAt(time_at, spec)` Fluxion primitive — where +`time_at` is supplied by the QPU vendor via an ETA or task-start event — +would allow graceful node draining instead of preemption. No current QPU +vendor exposes such an API. + +### 5.2 Non-Braket SDKs + +The interceptor patches `AwsDevice.run()`. IBM Qiskit Runtime, IQM native +SDK, and other vendors require separate interceptors in `sidecars//`. +The pattern is identical; only the SDK entry point differs. We will make +sidecars for different vendor interfaces. + +### 5.3 Single task per workflow + +The sidecar tracks one QPU task ARN per leader pod. Parameter-shift gradient +estimation and other multi-circuit workflows require tracking a set of ARNs. +See the scatter design issue for the proposed extension. + +### 5.4 Namespace-scoped RBAC + +The webhook creates `fluence-sidecar` RBAC in each namespace on first use. +This is correct behavior — the sidecar only needs permissions in its own +namespace. A Helm chart or operator would manage this more cleanly. + +## 6. Future Work + +### 6.1 MatchReserveAt Fluxion primitive + +A new `MatchReserveAt(time_at, spec)` function in the Fluxion Go bindings +would allow an externally-supplied reservation time. The sidecar would feed +live QPU queue position into this estimate, enabling graceful node draining +rather than preemption. This requires the C++ reapi `match_allocate_multi` +function to be exposed through the Go bindings with a `starttime` parameter. + +### 6.2 Scatter design + +For workflows with N independent QPU tasks each paired with one classical +pod, an index-based pairing mechanism (`fluence.flux-framework.org/index`) +would allow the sidecar to ungate specific worker pods when their specific +task reaches position==1. See the open scatter design issue. + +### 6.3 Vendor task-start events + +If QPU vendors exposed SNS/EventBridge notifications when a task transitions +from QUEUED to RUNNING, the sidecar could react to events rather than +polling. This would eliminate the 30s polling latency and enable more +precise ungating. + +### 6.4 PostFilter topology-aware preemption + +A custom Fluence `PostFilter` plugin would ask Fluxion which graph vertices +are blocking a high-priority worker pod, then target preemption at exactly +those pods — rather than the default Kubernetes preemption which picks +lowest-priority pods regardless of graph topology. This ensures preemption +always produces a valid Fluxion allocation. diff --git a/sidecars/braket/fluence_braket_intercept.py b/sidecars/braket/fluence_braket_intercept.py new file mode 100644 index 0000000..8afb6c5 --- /dev/null +++ b/sidecars/braket/fluence_braket_intercept.py @@ -0,0 +1,35 @@ +# fluence_braket_intercept.py +# +# Injected by the Fluence webhook into every pod requesting a QPU resource. +# Patches AwsDevice.run() to automatically tag every quantum task submission +# with the pod UID, enabling the fluence-sidecar to find the task without +# any user application changes. +# +# Installed as a Python sitecustomize hook so it runs before any user code. +# The user application requires no changes. + +import os + + +def _install_interceptor(): + try: + from braket.aws import AwsDevice + + _original_run = AwsDevice.run + + def _patched_run(self, task_specification, *args, **kwargs): + pod_uid = os.environ.get("FLUENCE_POD_UID", "") + if pod_uid: + tags = kwargs.get("tags", {}) + tags["fluence-pod-uid"] = pod_uid + kwargs["tags"] = tags + return _original_run(self, task_specification, *args, **kwargs) + + AwsDevice.run = _patched_run + + except ImportError: + # amazon-braket-sdk not installed in this container — skip + pass + + +_install_interceptor() diff --git a/sidecars/braket/sidecar.py b/sidecars/braket/sidecar.py new file mode 100644 index 0000000..432aac8 --- /dev/null +++ b/sidecars/braket/sidecar.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +""" +fluence-sidecar: Quantum-classical scheduling coordination for Fluence. + +Injected automatically by the Fluence mutating webhook into any pod +requesting a QPU resource (fluxion.flux-framework.org/qpu). + +Responsibilities: + 1. Find the quantum task submitted by the sibling user application + container, by searching for tasks tagged with FLUENCE_POD_UID. + 2. Poll task.queue_position() until position==1 or RUNNING. + 3. Patch braket.quantum/task-arn onto gated sibling classical pods. + 4. Remove scheduling gates from those pods — Kubernetes preemption + and the Fluence PostFilter handle placement from there. + +Environment variables (all injected by Fluence webhook): + FLUENCE_POD_UID UID of this pod + FLUENCE_POD_NAME Name of this pod + FLUENCE_NAMESPACE Kubernetes namespace + FLUENCE_GATED_PODS Comma-separated names of gated sibling pods + FLUXION_ARN Braket device ARN for this pod + FLUENCE_TASK_DISCOVERY_TIMEOUT Seconds to wait for task discovery (default: 300) + FLUENCE_POLL_INTERVAL Seconds between queue position polls (default: 30) + AWS_ACCESS_KEY_ID } AWS credentials — shared from pod spec + AWS_SECRET_ACCESS_KEY } + AWS_DEFAULT_REGION } +""" + +import asyncio +import json +import os +import subprocess +import sys +import time +from datetime import datetime, timezone + + +# ── helpers ──────────────────────────────────────────────────────────────────── +# Shared ungating logic lives in sidecars/lib/ungate.py so all vendor sidecars +# can reuse it. Add that directory to the path when running from the repo. +import sys +_lib = os.path.join(os.path.dirname(__file__), "..", "lib") +if os.path.isdir(_lib): + sys.path.insert(0, _lib) +from ungate import log, kubectl, ungate_pods, gated_pods_from_env, namespace_from_env + + +# ── task discovery ───────────────────────────────────────────────────────────── + +def find_task_by_tag(client, device_arn, pod_uid, timeout): + """ + Search for a Braket task tagged fluence-pod-uid= on device_arn. + Polls until found or timeout. Returns task ARN or None. + """ + log(f"Searching for task with tag fluence-pod-uid={pod_uid} on {device_arn}") + deadline = time.time() + timeout + + while time.time() < deadline: + try: + # Extract region from device ARN + # arn:aws:braket:::device/... + region = device_arn.split(":")[3] or os.environ.get("AWS_DEFAULT_REGION", "us-east-1") + response = client.search_quantum_tasks( + filters=[ + { + "name": "deviceArn", + "operator": "EQUAL", + "values": [device_arn], + }, + { + "name": "tags:fluence-pod-uid", + "operator": "EQUAL", + "values": [pod_uid], + }, + ], + maxResults=10, + ) + tasks = response.get("quantumTasks", []) + if tasks: + # Most recently created task is ours + tasks.sort(key=lambda t: t.get("createdAt", ""), reverse=True) + arn = tasks[0]["quantumTaskArn"] + log(f"Found task by tag: {arn}") + return arn + except Exception as e: + log(f"Search error (will retry): {e}") + + time.sleep(10) + + log("Task discovery by tag timed out") + return None + + +def find_task_by_time_window(client, device_arn, pod_start_ts, timeout): + """ + Fallback: find the most recently created task on device_arn submitted + after pod_start_ts. Used when tag-based discovery fails. + """ + log(f"Falling back to time-window heuristic (pod_start={pod_start_ts})") + deadline = time.time() + timeout + + while time.time() < deadline: + try: + response = client.search_quantum_tasks( + filters=[ + { + "name": "deviceArn", + "operator": "EQUAL", + "values": [device_arn], + }, + { + "name": "status", + "operator": "EQUAL", + "values": ["QUEUED"], + }, + ], + maxResults=50, + ) + tasks = response.get("quantumTasks", []) + # Filter to tasks created after pod start + candidates = [ + t for t in tasks + if t.get("createdAt", "") >= pod_start_ts + ] + if candidates: + candidates.sort(key=lambda t: t.get("createdAt", ""), reverse=True) + arn = candidates[0]["quantumTaskArn"] + log(f"Found task by time window (heuristic): {arn} " + f"(WARNING: may not be correct if multiple tasks submitted)") + return arn + except Exception as e: + log(f"Search error (will retry): {e}") + + time.sleep(10) + + log("Time-window task discovery timed out") + return None + + +# ── queue position polling ───────────────────────────────────────────────────── + +def wait_for_position_one(task_arn, poll_interval): + """ + Poll task.queue_position() until position==1 or task is RUNNING. + Returns when it's time to ungate classical pods. + """ + asyncio.set_event_loop(asyncio.new_event_loop()) + + from braket.aws import AwsQuantumTask + + log(f"Polling queue position for task {task_arn.split('/')[-1]}") + last_position = None + + while True: + try: + task = AwsQuantumTask(arn=task_arn) + state = task.state() + + if state in ("COMPLETED", "FAILED", "CANCELLED"): + log(f"Task reached terminal state: {state} — ungating now") + return state + + if state == "RUNNING": + log("Task is RUNNING — ungating classical pods") + return state + + pos_info = task.queue_position() + position = pos_info.queue_position + + if position != last_position: + log(f"Queue position: {position} (state={state})") + last_position = position + + if position == "1": + log("Queue position is 1 — ungating classical pods") + return state + + except Exception as e: + log(f"Queue position poll error (will retry): {e}") + + time.sleep(poll_interval) + + +# ungate_pods is imported from sidecars/lib/ungate.py + + +# ── main ─────────────────────────────────────────────────────────────────────── + +def main(): + pod_uid = os.environ.get("FLUENCE_POD_UID", "") + pod_name = os.environ.get("FLUENCE_POD_NAME", "") + namespace = os.environ.get("FLUENCE_NAMESPACE", "default") + gated_str = os.environ.get("FLUENCE_GATED_PODS", "") + device_arn = os.environ.get("FLUXION_ARN", "") + discovery_timeout = int(os.environ.get("FLUENCE_TASK_DISCOVERY_TIMEOUT", 300)) + poll_interval = int(os.environ.get("FLUENCE_POLL_INTERVAL", 30)) + + gated_pods = [p.strip() for p in gated_str.split(",") if p.strip()] + + log(f"Starting fluence-sidecar") + log(f" pod_uid : {pod_uid}") + log(f" pod_name : {pod_name}") + log(f" namespace : {namespace}") + log(f" device_arn : {device_arn}") + log(f" gated_pods : {gated_pods}") + + if not device_arn: + log("ERROR: FLUXION_ARN not set — cannot discover task") + sys.exit(1) + + if not gated_pods: + log("No gated pods to ungate — exiting") + sys.exit(0) + + # Get region from ARN or env + region = device_arn.split(":")[3] or os.environ.get("AWS_DEFAULT_REGION", "us-east-1") + if not region: + region = "us-east-1" + + import boto3 + asyncio.set_event_loop(asyncio.new_event_loop()) + client = boto3.client("braket", region_name=region) + + # Pod start time for fallback heuristic + pod_start_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + # 1. Discover task ARN + task_arn = find_task_by_tag(client, device_arn, pod_uid, discovery_timeout) + + if not task_arn: + log("Tag-based discovery failed — trying time-window heuristic") + task_arn = find_task_by_time_window( + client, device_arn, pod_start_ts, discovery_timeout + ) + + if not task_arn: + log("ERROR: could not find quantum task — ungating anyway to avoid deadlock") + ungate_pods(gated_pods, "", namespace) + sys.exit(1) + + # 2. Wait for position==1 or RUNNING + wait_for_position_one(task_arn, poll_interval) + + # 3. Ungate classical pods with task ARN + ungate_pods(gated_pods, task_arn, namespace) + + log("Done — classical pods ungated") + + +if __name__ == "__main__": + main() diff --git a/sidecars/braket/test/integration.sh b/sidecars/braket/test/integration.sh new file mode 100644 index 0000000..ab7dfe7 --- /dev/null +++ b/sidecars/braket/test/integration.sh @@ -0,0 +1,334 @@ +#!/usr/bin/env bash +# sidecars/braket/test/integration.sh +# +# Local integration test for the Fluence Braket sidecar. +# Requires a running Kubernetes cluster and AWS credentials with Braket access. +# +# What this tests: +# 1. SDK interceptor: AwsDevice.run() tags tasks with fluence-pod-uid +# 2. Task discovery: sidecar finds the task by tag via search_quantum_tasks +# 3. Queue position polling: sidecar polls and logs queue position +# 4. Ungating: sidecar removes gate and patches task ARN when position==1 +# +# Usage: +# # With existing cluster and credentials secret already applied: +# bash sidecars/braket/test/integration.sh +# +# # Override defaults: +# NAMESPACE=test BACKEND=sv1 bash sidecars/braket/test/integration.sh +# +# Prerequisites: +# - kubectl configured against a running cluster +# - aws-braket-credentials secret in $NAMESPACE +# - Fluence installed (for schedulerName: fluence to work) +# - fluence-sidecar-braket image built and loaded into cluster +# (or pulled from GHCR) +set -euo pipefail + +NAMESPACE="${NAMESPACE:-default}" +BACKEND="${BACKEND:-sv1}" +SIDECAR_IMAGE="${SIDECAR_IMAGE:-ghcr.io/converged-computing/fluence-sidecar-braket:latest}" +HERE="$(cd "$(dirname "$0")" && pwd)" + +log() { echo "=== [braket-integration] $*"; } +fail() { echo "FAIL: $*" >&2; dump; exit 1; } + +dump() { + echo "----- pods -----" + kubectl get pods -n "$NAMESPACE" -o wide || true + echo "----- gateway logs -----" + kubectl logs -n "$NAMESPACE" integration-gateway -c user-app --tail=50 || true + echo "----- sidecar logs -----" + kubectl logs -n "$NAMESPACE" integration-gateway -c fluence-sidecar --tail=50 || true + echo "----- classical pod -----" + kubectl describe pod -n "$NAMESPACE" integration-classical || true +} + +# Check prerequisites +kubectl get secret aws-braket-credentials -n "$NAMESPACE" > /dev/null 2>&1 \ + || fail "aws-braket-credentials secret not found in namespace $NAMESPACE" + +log "Running braket sidecar integration test" +log " namespace : $NAMESPACE" +log " backend : $BACKEND" +log " image : $SIDECAR_IMAGE" + +# Determine device ARN from backend name +case "$BACKEND" in + sv1) DEVICE_ARN="arn:aws:braket:::device/quantum-simulator/amazon/sv1" ;; + tn1) DEVICE_ARN="arn:aws:braket:::device/quantum-simulator/amazon/tn1" ;; + *) fail "Unknown backend: $BACKEND (use sv1 or tn1 for integration tests)" ;; +esac + +POD_UID="integration-test-$(date +%s)" + +# Clean up any leftover pods from a previous run +kubectl delete pod integration-gateway integration-classical \ + -n "$NAMESPACE" --ignore-not-found=true --wait=true 2>/dev/null || true +kubectl delete rolebinding fluence-sidecar-integration \ + -n "$NAMESPACE" --ignore-not-found=true 2>/dev/null || true +kubectl delete role fluence-sidecar-integration \ + -n "$NAMESPACE" --ignore-not-found=true 2>/dev/null || true +kubectl delete serviceaccount fluence-sidecar-integration \ + -n "$NAMESPACE" --ignore-not-found=true 2>/dev/null || true + +# Create RBAC for sidecar to patch pods +kubectl apply -n "$NAMESPACE" -f - << YAML +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: fluence-sidecar-integration + namespace: ${NAMESPACE} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: fluence-sidecar-integration + namespace: ${NAMESPACE} +rules: + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "patch", "annotate"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: fluence-sidecar-integration + namespace: ${NAMESPACE} +subjects: + - kind: ServiceAccount + name: fluence-sidecar-integration + namespace: ${NAMESPACE} +roleRef: + kind: Role + name: fluence-sidecar-integration + apiGroup: rbac.authorization.k8s.io +YAML + +# Create the classical pod (gated, waiting for sidecar) +kubectl apply -n "$NAMESPACE" -f - << YAML +apiVersion: v1 +kind: Pod +metadata: + name: integration-classical + namespace: ${NAMESPACE} + annotations: + braket.quantum/task-arn: "" +spec: + restartPolicy: Never + schedulingGates: + - name: quantum.braket/ready + containers: + - name: classical-worker + image: python:3.11-slim + command: + - python3 + - -c + - | + import os, time + arn = os.environ.get("BRAKET_TASK_ARN", "") + print(f"TASK_ARN={arn}") + assert arn, "BRAKET_TASK_ARN is empty" + print("classical-worker: task ARN received correctly") + # Verify we can retrieve the result from Braket using the ARN + from braket.aws import AwsQuantumTask + import asyncio + asyncio.set_event_loop(asyncio.new_event_loop()) + task = AwsQuantumTask(arn=arn) + state = task.state() + print(f"classical-worker: task state={state}") + assert state in ("COMPLETED", "RUNNING"), f"unexpected state: {state}" + print("PASS: classical worker got valid task ARN and confirmed task state") + env: + - name: BRAKET_TASK_ARN + valueFrom: + fieldRef: + fieldPath: metadata.annotations['braket.quantum/task-arn'] + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_DEFAULT_REGION + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_DEFAULT_REGION + resources: + requests: + cpu: "100m" + memory: "256Mi" +YAML + +# Create the gateway pod with user-app + real sidecar +kubectl apply -n "$NAMESPACE" -f - << YAML +apiVersion: v1 +kind: Pod +metadata: + name: integration-gateway + namespace: ${NAMESPACE} +spec: + restartPolicy: Never + serviceAccountName: fluence-sidecar-integration + + initContainers: + # user-app: submits a real circuit to SV1 — SDK interceptor tags it + - name: user-app + image: ghcr.io/converged-computing/quantum-braket-braket-gateway:latest + command: + - python3 + - -c + - | + import os, sys + # Install the interceptor (normally injected by webhook) + sys.path.insert(0, "/app") + exec(open("/app/fluence_braket_intercept.py").read()) + + from braket.aws import AwsDevice + from braket.circuits import Circuit + + device = AwsDevice("${DEVICE_ARN}") + bell = Circuit().h(0).cnot(0, 1) + print(f"user-app: submitting circuit to ${BACKEND}") + print(f"user-app: FLUENCE_POD_UID={os.environ.get('FLUENCE_POD_UID', 'NOT SET')}") + task = device.run(bell, shots=10) + print(f"user-app: submitted task {task.id}") + print(f"user-app: tags should include fluence-pod-uid") + env: + - name: FLUENCE_POD_UID + value: "${POD_UID}" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_DEFAULT_REGION + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_DEFAULT_REGION + + containers: + # real fluence-sidecar + - name: fluence-sidecar + image: ${SIDECAR_IMAGE} + env: + - name: FLUENCE_POD_UID + value: "${POD_UID}" + - name: FLUENCE_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: FLUENCE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: FLUENCE_GATED_PODS + value: "integration-classical" + - name: FLUXION_ARN + value: "${DEVICE_ARN}" + - name: FLUENCE_TASK_DISCOVERY_TIMEOUT + value: "120" + - name: FLUENCE_POLL_INTERVAL + value: "10" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_DEFAULT_REGION + valueFrom: + secretKeyRef: + name: aws-braket-credentials + key: AWS_DEFAULT_REGION + resources: + requests: + cpu: "100m" + memory: "512Mi" +YAML + +log "Pods submitted. Waiting for gateway to reach Running..." + +# Wait for gateway Running +for i in $(seq 1 120); do + phase=$(kubectl get pod integration-gateway -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}' 2>/dev/null || true) + [ "$phase" = "Running" ] && break + sleep 3 +done +[ "$(kubectl get pod integration-gateway -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}')" = "Running" ] \ + || fail "integration-gateway did not reach Running" + +log "Gateway is Running. Waiting for sidecar to ungate classical pod..." + +# Wait for classical pod to be ungated (up to 5 minutes for SV1 queue) +for i in $(seq 1 100); do + phase=$(kubectl get pod integration-classical -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}' 2>/dev/null || true) + { [ "$phase" = "Running" ] || [ "$phase" = "Succeeded" ]; } && break + # Print sidecar progress every 30s + [ $((i % 10)) -eq 0 ] && \ + kubectl logs integration-gateway -n "$NAMESPACE" \ + -c fluence-sidecar --tail=5 2>/dev/null || true + sleep 3 +done + +phase=$(kubectl get pod integration-classical -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}' 2>/dev/null || true) +{ [ "$phase" = "Running" ] || [ "$phase" = "Succeeded" ]; } \ + || fail "integration-classical was not ungated (phase=$phase)" + +log "Classical pod ungated. Checking task ARN annotation..." + +arn=$(kubectl get pod integration-classical -n "$NAMESPACE" \ + -o jsonpath='{.metadata.annotations.braket\.quantum/task-arn}' 2>/dev/null || true) +[ -n "$arn" ] || fail "braket.quantum/task-arn annotation not set" +log "Task ARN: $arn" + +# Wait for classical pod to complete +for i in $(seq 1 60); do + phase=$(kubectl get pod integration-classical -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}' 2>/dev/null || true) + [ "$phase" = "Succeeded" ] && break + [ "$phase" = "Failed" ] && fail "integration-classical Failed" + sleep 3 +done + +[ "$(kubectl get pod integration-classical -n "$NAMESPACE" \ + -o jsonpath='{.status.phase}')" = "Succeeded" ] \ + || fail "integration-classical did not Succeed" + +# Verify classical pod got the ARN and confirmed task state +out=$(kubectl logs integration-classical -n "$NAMESPACE" 2>/dev/null || true) +echo "$out" | grep -q "PASS:" || fail "classical worker did not PASS (logs: $out)" + +log "Sidecar logs:" +kubectl logs integration-gateway -n "$NAMESPACE" -c fluence-sidecar || true + +log "PASS: full braket sidecar integration test complete" +log " SDK interceptor tagged task with fluence-pod-uid" +log " Sidecar discovered task by tag" +log " Sidecar polled queue position and ungated at position==1" +log " Task ARN propagated to classical pod via annotation" +log " Classical pod confirmed task state via Braket SDK" + +# Cleanup +kubectl delete pod integration-gateway integration-classical \ + -n "$NAMESPACE" --ignore-not-found=true --wait=false || true diff --git a/sidecars/lib/ungate.py b/sidecars/lib/ungate.py new file mode 100644 index 0000000..a0107b5 --- /dev/null +++ b/sidecars/lib/ungate.py @@ -0,0 +1,100 @@ +""" +sidecars/lib/ungate.py — shared ungating logic for all Fluence sidecars. + +Every vendor sidecar calls ungate_pods() once the quantum task is ready. +This module handles the Kubernetes side: patching the task ARN annotation +and removing the scheduling gate from each classical pod. +""" + +import json +import os +import subprocess +from datetime import datetime, timezone + + +def log(msg): + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + print(f"[fluence-sidecar] {ts} {msg}", flush=True) + + +def kubectl(args): + result = subprocess.run( + ["kubectl"] + args, + capture_output=True, text=True + ) + if result.returncode != 0: + raise RuntimeError( + f"kubectl {' '.join(args)} failed: {result.stderr.strip()}" + ) + return result.stdout.strip() + + +def ungate_pods(gated_pods, task_arn, namespace): + """ + For each gated pod: + 1. Patch braket.quantum/task-arn annotation with the task ARN + 2. Remove the quantum.braket/ready scheduling gate + + gated_pods: list of pod names + task_arn: the vendor task ARN to propagate (may be empty string if unknown) + namespace: Kubernetes namespace + """ + for pod_name in gated_pods: + pod_name = pod_name.strip() + if not pod_name: + continue + + log(f"Ungating pod: {pod_name}") + + # 1. Patch task ARN annotation + if task_arn: + try: + kubectl([ + "annotate", "pod", pod_name, + "-n", namespace, + f"braket.quantum/task-arn={task_arn}", + "--overwrite", + ]) + log(f" Patched task ARN onto {pod_name}: {task_arn}") + except RuntimeError as e: + log(f" WARNING: could not patch annotation on {pod_name}: {e}") + else: + log(f" WARNING: no task ARN available to patch onto {pod_name}") + + # 2. Set high priority class and remove scheduling gate atomically + # Priority is set here (not in webhook) to avoid admission controller + # conflict where priority:0 is already defaulted before our patch. + patch = json.dumps([ + { + "op": "add", + "path": "/spec/priorityClassName", + "value": "fluence-quantum-classical" + }, + { + "op": "remove", + "path": "/spec/schedulingGates/0" + } + ]) + try: + kubectl([ + "patch", "pod", pod_name, + "-n", namespace, + "--type=json", + f"-p={patch}", + ]) + log(f" Set priority and removed scheduling gate from {pod_name}") + except RuntimeError as e: + log(f" WARNING: could not patch {pod_name}: {e}") + + +def gated_pods_from_env(): + """Read FLUENCE_GATED_PODS env var and return a list of pod names.""" + return [ + p.strip() + for p in os.environ.get("FLUENCE_GATED_PODS", "").split(",") + if p.strip() + ] + + +def namespace_from_env(): + return os.environ.get("FLUENCE_NAMESPACE", "default") diff --git a/test/e2e/04-sidecar-ungate.sh b/test/e2e/04-sidecar-ungate.sh new file mode 100644 index 0000000..4a374a9 --- /dev/null +++ b/test/e2e/04-sidecar-ungate.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# Sidecar webhook test. +# +# Verifies that when a PodGroup of size > 1 with QPU resources is submitted: +# 1. The webhook creates fluence-sidecar RBAC in the namespace automatically +# 2. The leader pod gets the sidecar container injected +# 3. The worker pod gets the quantum.braket/ready scheduling gate added +# 4. The worker pod gets fluence-quantum-classical priority class set +# +# Does NOT test the braket sidecar itself (task discovery, SDK interceptor, +# queue position polling). Those require real AWS credentials and are covered +# by sidecars/braket/test/integration.sh which is run locally. +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE}/lib.sh" + +log "TEST 4: sidecar webhook — RBAC creation, gate injection, sidecar injection" + +kubectl apply -f examples/test/e2e/sidecar-mock-pods.yaml + +# Give webhook time to process the leader pod admission +sleep 3 + +# Print webhook logs — always show these so we can see what happened +log "--- webhook logs ---" +kubectl logs -n kube-system deployment/fluence-webhook --tail=50 || true +log "--- end webhook logs ---" + +# 1. Webhook should have created fluence-sidecar ServiceAccount +log "checking webhook created fluence-sidecar ServiceAccount..." +for i in $(seq 1 30); do + kubectl get serviceaccount fluence-sidecar -n default > /dev/null 2>&1 && break + sleep 2 +done +kubectl get serviceaccount fluence-sidecar -n default \ + || fail "webhook did not create fluence-sidecar ServiceAccount" +log " fluence-sidecar ServiceAccount created" + +# 2. Webhook should have created fluence-sidecar Role +kubectl get role fluence-sidecar -n default \ + || fail "webhook did not create fluence-sidecar Role" +log " fluence-sidecar Role created" + +# 3. Webhook should have created fluence-sidecar RoleBinding +kubectl get rolebinding fluence-sidecar -n default \ + || fail "webhook did not create fluence-sidecar RoleBinding" +log " fluence-sidecar RoleBinding created" + +# 4. Webhook should have copied interceptor ConfigMap into the namespace +kubectl get configmap fluence-braket-interceptor -n default \ + || fail "webhook did not copy fluence-braket-interceptor ConfigMap into namespace" +log " fluence-braket-interceptor ConfigMap copied into namespace" + +# 5. Leader pod should have sidecar container injected +log "checking sidecar injected into leader pod..." +wait_pod_phase sidecar-test-leader Running 120 \ + || { kubectl describe pod sidecar-test-leader; fail "sidecar-test-leader did not reach Running"; } +containers=$(kubectl get pod sidecar-test-leader \ + -o jsonpath='{.spec.containers[*].name}') +echo "$containers" | grep -q "fluence-sidecar" \ + || fail "fluence-sidecar container not injected into leader (containers: $containers)" +log " fluence-sidecar container injected into leader" + +# 6. Worker pod should have scheduling gate added by webhook +gate=$(kubectl get pod sidecar-test-worker \ + -o jsonpath='{.spec.schedulingGates[0].name}') +[ "$gate" = "quantum.braket/ready" ] \ + || fail "worker pod does not have quantum.braket/ready gate (got: $gate)" +log " quantum.braket/ready gate set on worker" + +log "PASS: webhook correctly created RBAC, injected sidecar, gated worker" +log "NOTE: fluence-quantum-classical priority is set by the sidecar at ungate time, not the webhook" +log "NOTE: braket sidecar integration test (SDK intercept, tag discovery," +log " queue polling) is in sidecars/braket/test/integration.sh" + +# Only clean up pods and PodGroup — RBAC is namespace infrastructure +# that persists for future quantum workflows in this namespace +kubectl delete -f examples/test/e2e/sidecar-mock-pods.yaml