From 47f251b61f5542283ecd273ee4922e05dd588519 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:18:17 -0400 Subject: [PATCH] perf(appsec): build WAF off the init critical path AppSecProcessor::new zstd-decompresses a ~29KB->322KB ruleset, JSON-parses it, and compiles the libddwaf WAF (tens of ms) synchronously during init. The WAF is only needed once the first request payload is evaluated, which is strictly after the first /next, so this work does not belong on the init critical path. Replace the eager Option>> with a deferred, awaitable handle (Arc>>>>). When AppSec is enabled, the build runs on the blocking pool (spawn_blocking) from a background task; consumers (trace processor and the runtime API proxy) resolve the handle where they actually use the WAF, awaiting the in-flight build if a request somehow arrives before it finishes. The disabled-by-default path stays cheap: the feature flag is checked synchronously and yields no handle and no build. --- bottlecap/src/appsec/mod.rs | 97 +++++++++++++++++++++++++ bottlecap/src/bin/bottlecap/main.rs | 30 +++----- bottlecap/src/proxy/interceptor.rs | 82 ++++++++++++--------- bottlecap/src/traces/trace_agent.rs | 6 +- bottlecap/src/traces/trace_processor.rs | 17 +++-- 5 files changed, 172 insertions(+), 60 deletions(-) diff --git a/bottlecap/src/appsec/mod.rs b/bottlecap/src/appsec/mod.rs index 18755a04c..910f6a953 100644 --- a/bottlecap/src/appsec/mod.rs +++ b/bottlecap/src/appsec/mod.rs @@ -1,9 +1,37 @@ use std::env; +use std::sync::Arc; +use tokio::sync::{Mutex, OnceCell}; +use tracing::error; + +use crate::appsec::processor::{Error as AppSecError, Processor}; use crate::config::Config; pub mod processor; +/// A [`Processor`] shared across the trace agent and the runtime API proxy. +/// +/// Wrapped in a [`Mutex`] (and an [`Arc`] so the value stays [`Send`]) because +/// the WAF context buffer is mutated from multiple asynchronous tasks. +pub type SharedProcessor = Arc>; + +/// A handle to a [`SharedProcessor`] whose construction is deferred off the +/// initialization critical path. +/// +/// Building the WAF (zstd-decompressing and JSON-parsing the ruleset, then +/// compiling it through `libddwaf`) costs tens of milliseconds and is only +/// needed once the first request payload is evaluated — which is strictly after +/// the first `/next`. Rather than block init on that work, consumers hold this +/// awaitable handle and resolve it (via [`resolve`]) at the point where they +/// actually need the WAF. +/// +/// The outer [`Option`] (see [`defer_processor`]) distinguishes the +/// feature-disabled case (no handle at all) from the enabled case. The inner +/// [`Option`] distinguishes a successfully built processor (`Some`) from a build +/// that failed (`None`), in which case the feature is treated as a no-op exactly +/// as it would have been with the previous eager construction. +pub type DeferredProcessor = Arc>>; + /// Determines whether the Serverless App & API Protection features are enabled. #[must_use] pub const fn is_enabled(cfg: &Config) -> bool { @@ -16,3 +44,72 @@ pub const fn is_enabled(cfg: &Config) -> bool { pub fn is_standalone() -> bool { env::var("DD_APM_TRACING_ENABLED").is_ok_and(|s| s.to_lowercase() == "true") } + +/// Prepares a [`DeferredProcessor`] for the App & API Protection feature without +/// blocking the caller on the (CPU-bound) WAF build. +/// +/// Returns [`None`] immediately when the feature is disabled, preserving the +/// cheap, synchronous disabled-by-default path. When enabled, a background task +/// is spawned to build the processor off the critical path, and a handle that +/// resolves to it is returned right away. Consumers call [`resolve`] where they +/// use the WAF; if a request arrives before the background build has finished, +/// that call simply awaits the in-flight build (or kicks it off itself). +#[must_use] +pub fn defer_processor(cfg: &Arc) -> Option { + if !is_enabled(cfg) { + // Feature disabled: nothing to build, and nothing to await. + return None; + } + + let cell: DeferredProcessor = Arc::new(OnceCell::new()); + + // Kick the build off the synchronous init path. The first consumer to call + // `resolve` will await whatever this task produces (or, if it somehow races + // ahead, run an equivalent build itself via `get_or_init`). + let background_cell = Arc::clone(&cell); + let background_cfg = Arc::clone(cfg); + tokio::spawn(async move { + let _ = background_cell + .get_or_init(|| build_processor(background_cfg)) + .await; + }); + + Some(cell) +} + +/// Resolves a [`DeferredProcessor`] to the underlying [`SharedProcessor`], if the +/// WAF was built successfully. +/// +/// Awaits the background build started by [`defer_processor`] when it is still in +/// flight; if no build is running yet, it starts one (off-thread, CPU-bound work +/// runs on the blocking pool). Subsequent calls return immediately. +pub async fn resolve(handle: &DeferredProcessor, cfg: &Arc) -> Option { + handle + .get_or_init(|| build_processor(Arc::clone(cfg))) + .await + .clone() +} + +/// Builds the App & API Protection [`Processor`] on the blocking thread pool. +/// +/// The WAF build is CPU-bound (ruleset decompression, JSON parsing, and WAF +/// compilation), so it is offloaded with [`tokio::task::spawn_blocking`] to keep +/// it off the async worker threads. Returns [`None`] (logging at the appropriate +/// level) when the feature is disabled or the build fails, matching the previous +/// "feature is silently a no-op" behaviour. +async fn build_processor(cfg: Arc) -> Option { + match tokio::task::spawn_blocking(move || Processor::new(&cfg)).await { + Ok(Ok(processor)) => Some(Arc::new(Mutex::new(processor))), + Ok(Err(AppSecError::FeatureDisabled)) => None, + Ok(Err(e)) => { + error!( + "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" + ); + None + } + Err(e) => { + error!("AAP | App & API Protection processor build task failed to join: {e}"); + None + } + } +} diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 440fa2f7b..7553c056d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -18,9 +18,7 @@ static GLOBAL: Jemalloc = Jemalloc; use bottlecap::{ DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG, - appsec::processor::{ - Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor, - }, + appsec::{self, DeferredProcessor as AppSecProcessor}, config::{ self, Config, aws::{AwsConfig, build_lambda_function_arn}, @@ -387,17 +385,12 @@ async fn extension_loop_active( invocation_processor_service.run().await; }); - // AppSec processor (if enabled) - let appsec_processor = match AppSecProcessor::new(config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; + // AppSec processor (if enabled). The WAF build (ruleset decompression, JSON + // parsing, and `libddwaf` compilation) is CPU-bound and only needed once the + // first request payload is evaluated, which is strictly after the first + // `/next`. Defer it off the init critical path: consumers receive an + // awaitable handle and resolve it where they actually use the WAF. + let appsec_processor = appsec::defer_processor(config); let ( trace_agent_channel, @@ -825,7 +818,7 @@ async fn extension_loop_active( async fn wait_for_tombstone_event( event_bus: &mut EventBus, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -882,7 +875,7 @@ fn cancel_background_services( async fn handle_event_bus_event( event: Event, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -1149,7 +1142,7 @@ fn start_trace_agent( api_key_factory: &Arc, tags_provider: &Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, client: &Client, ) -> ( Sender, @@ -1477,7 +1470,7 @@ fn start_api_runtime_proxy( config: &Arc, aws_config: Arc, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option<&Arc>>, + appsec_processor: Option<&AppSecProcessor>, propagator: Arc, ) -> Option { if !should_start_proxy(config, Arc::clone(&aws_config)) { @@ -1491,6 +1484,7 @@ fn start_api_runtime_proxy( invocation_processor_handle.clone(), appsec_processor, propagator, + Arc::clone(config), ) .ok() } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 21a018377..c8d0270dc 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -2,8 +2,12 @@ use crate::lifecycle::invocation::processor_service::InvocationProcessorHandle; use crate::lifecycle::invocation::triggers::IdentifiedTrigger; use crate::traces::propagation::DatadogCompositePropagator; use crate::{ - appsec::processor::Processor as AppSecProcessor, config::aws::AwsConfig, - extension::EXTENSION_HOST, lwa, proxy::tee_body::TeeBodyWithCompletion, + appsec::{self, DeferredProcessor as AppSecProcessor}, + config::Config, + config::aws::AwsConfig, + extension::EXTENSION_HOST, + lwa, + proxy::tee_body::TeeBodyWithCompletion, }; use axum::{ Router, @@ -32,16 +36,18 @@ type InterceptorState = ( Arc, Arc>, InvocationProcessorHandle, - Option>>, + Option, Arc, Arc>>, + Arc, ); pub fn start( aws_config: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, propagator: Arc, + config: Arc, ) -> Result> { let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref()); let shutdown_token = CancellationToken::new(); @@ -62,6 +68,7 @@ pub fn start( appsec_processor, propagator, tasks.clone(), + config, ); let tasks_clone = tasks.clone(); @@ -133,9 +140,15 @@ fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) - async fn invocation_next_proxy( Path(api_version): Path, - State((aws_config, client, invocation_processor, appsec_processor, propagator, tasks)): State< - InterceptorState, - >, + State(( + aws_config, + client, + invocation_processor, + appsec_processor, + propagator, + tasks, + config, + )): State, request: Request, ) -> Response { debug!("PROXY | invocation_next_proxy | api_version: {api_version}"); @@ -179,20 +192,23 @@ async fn invocation_next_proxy( if let Ok(body) = intercepted_completion_receiver.await { debug!("PROXY | invocation_next_proxy | intercepted body completed"); - if let Some(appsec_processor) = appsec_processor + if let Some(appsec_handle) = &appsec_processor && let Some(request_id) = intercepted_parts_clone .headers .get("Lambda-Runtime-Aws-Request-Id") .and_then(|v| v.to_str().ok()) + && let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - { - if let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - appsec_processor - .lock() - .await - .process_invocation_next(request_id, &trigger) - .await; - } + // Resolve the deferred WAF handle, awaiting its off-critical-path + // build if it has not finished yet. This is the first request + // payload to be evaluated, so it is the earliest point the WAF is + // actually needed. + if let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await { + appsec_processor + .lock() + .await + .process_invocation_next(request_id, &trigger) + .await; } } @@ -223,7 +239,7 @@ async fn invocation_next_proxy( async fn invocation_response_proxy( Path((api_version, request_id)): Path<(String, String)>, - State((aws_config, client, invocation_processor, appsec_processor, _, tasks)): State< + State((aws_config, client, invocation_processor, appsec_processor, _, tasks, config)): State< InterceptorState, >, request: Request, @@ -241,7 +257,9 @@ async fn invocation_response_proxy( join_set.spawn(async move { if let Ok(body) = outgoing_completion_receiver.await { debug!("PROXY | invocation_response_proxy | intercepted outgoing body completed"); - if let Some(appsec_processor) = appsec_processor { + if let Some(appsec_handle) = &appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await + { appsec_processor .lock() .await @@ -304,8 +322,10 @@ async fn invocation_error_proxy( request: Request, ) -> Response { debug!("PROXY | invocation_error_proxy | api_version: {api_version}, request_id: {request_id}"); - let State((_, _, _, appsec_processor, _, _)) = &state; - if let Some(appsec_processor) = appsec_processor { + let State((_, _, _, appsec_processor, _, _, config)) = &state; + if let Some(appsec_handle) = appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, config).await + { // Marking any outstanding security context as finalized by sending a blank response. appsec_processor .lock() @@ -318,7 +338,7 @@ async fn invocation_error_proxy( } async fn passthrough_proxy( - State((aws_config, client, _, _, _, _)): State, + State((aws_config, client, _, _, _, _, _)): State, request: Request, ) -> Response { let (parts, body) = request.into_parts(); @@ -434,7 +454,7 @@ where mod tests { use http_body_util::BodyExt; use std::{collections::HashMap, time::Duration}; - use tokio::{sync::Mutex as TokioMutex, time::Instant}; + use tokio::time::Instant; use dogstatsd::{aggregator::AggregatorService, metric::EMPTY_TAGS}; use http_body_util::Full; @@ -444,8 +464,8 @@ mod tests { use super::*; use crate::lifecycle::invocation::processor_service::InvocationProcessorService; use crate::{ - LAMBDA_RUNTIME_SLUG, appsec::processor::Error::FeatureDisabled as AppSecFeatureDisabled, - config::Config, tags::provider::Provider, traces::propagation::DatadogCompositePropagator, + LAMBDA_RUNTIME_SLUG, config::Config, tags::provider::Provider, + traces::propagation::DatadogCompositePropagator, }; #[tokio::test] @@ -513,22 +533,16 @@ mod tests { invocation_processor_service.run().await; }); - let appsec_processor = match AppSecProcessor::new(&config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "PROXY | aap | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; + // App & API Protection is disabled in the default config, so this + // resolves to `None` synchronously (no WAF build is spawned). + let appsec_processor = appsec::defer_processor(&config); let proxy_handle = start( aws_config, invocation_processor_handle, appsec_processor, propagator, + Arc::clone(&config), ) .expect("Failed to start API runtime proxy"); let https = HttpConnector::new(); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 37b71d29f..537302737 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -23,7 +23,7 @@ use tracing::{debug, error, warn}; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ - appsec::processor::Processor as AppSecProcessor, + appsec::DeferredProcessor as AppSecProcessor, config, http::{extract_request_body, handler_not_found}, lifecycle::invocation::{ @@ -113,7 +113,7 @@ pub struct TraceAgent { pub proxy_aggregator: Arc>, pub tags_provider: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, shutdown_token: CancellationToken, tx: Sender, stats_concentrator: StatsConcentratorHandle, @@ -137,7 +137,7 @@ impl TraceAgent { stats_processor: Arc, proxy_aggregator: Arc>, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 256d8bf14..81da39bd9 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -3,6 +3,7 @@ use crate::appsec::processor::Processor as AppSecProcessor; use crate::appsec::processor::context::HoldArguments; +use crate::appsec::{self, DeferredProcessor as AppSecDeferredProcessor}; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::lifecycle::invocation::triggers::get_default_service_name; @@ -28,7 +29,6 @@ use libdd_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollec use regex::Regex; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; @@ -498,9 +498,10 @@ impl TraceProcessor for ServerlessTraceProcessor { /// Once ready to flush, the traces are submitted to the provided [`Sender`]. #[derive(Clone)] pub struct SendingTraceProcessor { - /// The [`AppSecProcessor`] to use for security-processing the traces, if - /// configured. - pub appsec: Option>>, + /// A deferred handle to the [`AppSecProcessor`] to use for + /// security-processing the traces, if the feature is enabled. The handle is + /// resolved (awaiting the off-critical-path WAF build) at the point of use. + pub appsec: Option, /// The [`TraceProcessor`] to use for transforming raw traces into /// [`SendDataBuilderInfo`]s before flushing. pub processor: Arc, @@ -522,7 +523,13 @@ impl SendingTraceProcessor { body_size: usize, span_pointers: Option>, ) -> Result<(), SendError> { - traces = if let Some(appsec) = &self.appsec { + // Resolve the deferred AppSec handle (awaiting the off-critical-path WAF + // build if it is still in flight) only when the feature is enabled. + let appsec = match &self.appsec { + Some(handle) => appsec::resolve(handle, &config).await, + None => None, + }; + traces = if let Some(appsec) = &appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { let Some(span) = AppSecProcessor::service_entry_span_mut(&mut trace) else {