diff --git a/bottlecap/src/appsec/mod.rs b/bottlecap/src/appsec/mod.rs index 18755a04c..910f6a953 100644 --- a/bottlecap/src/appsec/mod.rs +++ b/bottlecap/src/appsec/mod.rs @@ -1,9 +1,37 @@ use std::env; +use std::sync::Arc; +use tokio::sync::{Mutex, OnceCell}; +use tracing::error; + +use crate::appsec::processor::{Error as AppSecError, Processor}; use crate::config::Config; pub mod processor; +/// A [`Processor`] shared across the trace agent and the runtime API proxy. +/// +/// Wrapped in a [`Mutex`] (and an [`Arc`] so the value stays [`Send`]) because +/// the WAF context buffer is mutated from multiple asynchronous tasks. +pub type SharedProcessor = Arc>; + +/// A handle to a [`SharedProcessor`] whose construction is deferred off the +/// initialization critical path. +/// +/// Building the WAF (zstd-decompressing and JSON-parsing the ruleset, then +/// compiling it through `libddwaf`) costs tens of milliseconds and is only +/// needed once the first request payload is evaluated — which is strictly after +/// the first `/next`. Rather than block init on that work, consumers hold this +/// awaitable handle and resolve it (via [`resolve`]) at the point where they +/// actually need the WAF. +/// +/// The outer [`Option`] (see [`defer_processor`]) distinguishes the +/// feature-disabled case (no handle at all) from the enabled case. The inner +/// [`Option`] distinguishes a successfully built processor (`Some`) from a build +/// that failed (`None`), in which case the feature is treated as a no-op exactly +/// as it would have been with the previous eager construction. +pub type DeferredProcessor = Arc>>; + /// Determines whether the Serverless App & API Protection features are enabled. #[must_use] pub const fn is_enabled(cfg: &Config) -> bool { @@ -16,3 +44,72 @@ pub const fn is_enabled(cfg: &Config) -> bool { pub fn is_standalone() -> bool { env::var("DD_APM_TRACING_ENABLED").is_ok_and(|s| s.to_lowercase() == "true") } + +/// Prepares a [`DeferredProcessor`] for the App & API Protection feature without +/// blocking the caller on the (CPU-bound) WAF build. +/// +/// Returns [`None`] immediately when the feature is disabled, preserving the +/// cheap, synchronous disabled-by-default path. When enabled, a background task +/// is spawned to build the processor off the critical path, and a handle that +/// resolves to it is returned right away. Consumers call [`resolve`] where they +/// use the WAF; if a request arrives before the background build has finished, +/// that call simply awaits the in-flight build (or kicks it off itself). +#[must_use] +pub fn defer_processor(cfg: &Arc) -> Option { + if !is_enabled(cfg) { + // Feature disabled: nothing to build, and nothing to await. + return None; + } + + let cell: DeferredProcessor = Arc::new(OnceCell::new()); + + // Kick the build off the synchronous init path. The first consumer to call + // `resolve` will await whatever this task produces (or, if it somehow races + // ahead, run an equivalent build itself via `get_or_init`). + let background_cell = Arc::clone(&cell); + let background_cfg = Arc::clone(cfg); + tokio::spawn(async move { + let _ = background_cell + .get_or_init(|| build_processor(background_cfg)) + .await; + }); + + Some(cell) +} + +/// Resolves a [`DeferredProcessor`] to the underlying [`SharedProcessor`], if the +/// WAF was built successfully. +/// +/// Awaits the background build started by [`defer_processor`] when it is still in +/// flight; if no build is running yet, it starts one (off-thread, CPU-bound work +/// runs on the blocking pool). Subsequent calls return immediately. +pub async fn resolve(handle: &DeferredProcessor, cfg: &Arc) -> Option { + handle + .get_or_init(|| build_processor(Arc::clone(cfg))) + .await + .clone() +} + +/// Builds the App & API Protection [`Processor`] on the blocking thread pool. +/// +/// The WAF build is CPU-bound (ruleset decompression, JSON parsing, and WAF +/// compilation), so it is offloaded with [`tokio::task::spawn_blocking`] to keep +/// it off the async worker threads. Returns [`None`] (logging at the appropriate +/// level) when the feature is disabled or the build fails, matching the previous +/// "feature is silently a no-op" behaviour. +async fn build_processor(cfg: Arc) -> Option { + match tokio::task::spawn_blocking(move || Processor::new(&cfg)).await { + Ok(Ok(processor)) => Some(Arc::new(Mutex::new(processor))), + Ok(Err(AppSecError::FeatureDisabled)) => None, + Ok(Err(e)) => { + error!( + "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" + ); + None + } + Err(e) => { + error!("AAP | App & API Protection processor build task failed to join: {e}"); + None + } + } +} diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 440fa2f7b..7553c056d 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -18,9 +18,7 @@ static GLOBAL: Jemalloc = Jemalloc; use bottlecap::{ DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG, - appsec::processor::{ - Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor, - }, + appsec::{self, DeferredProcessor as AppSecProcessor}, config::{ self, Config, aws::{AwsConfig, build_lambda_function_arn}, @@ -387,17 +385,12 @@ async fn extension_loop_active( invocation_processor_service.run().await; }); - // AppSec processor (if enabled) - let appsec_processor = match AppSecProcessor::new(config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; + // AppSec processor (if enabled). The WAF build (ruleset decompression, JSON + // parsing, and `libddwaf` compilation) is CPU-bound and only needed once the + // first request payload is evaluated, which is strictly after the first + // `/next`. Defer it off the init critical path: consumers receive an + // awaitable handle and resolve it where they actually use the WAF. + let appsec_processor = appsec::defer_processor(config); let ( trace_agent_channel, @@ -825,7 +818,7 @@ async fn extension_loop_active( async fn wait_for_tombstone_event( event_bus: &mut EventBus, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -882,7 +875,7 @@ fn cancel_background_services( async fn handle_event_bus_event( event: Event, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -1149,7 +1142,7 @@ fn start_trace_agent( api_key_factory: &Arc, tags_provider: &Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, client: &Client, ) -> ( Sender, @@ -1477,7 +1470,7 @@ fn start_api_runtime_proxy( config: &Arc, aws_config: Arc, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option<&Arc>>, + appsec_processor: Option<&AppSecProcessor>, propagator: Arc, ) -> Option { if !should_start_proxy(config, Arc::clone(&aws_config)) { @@ -1491,6 +1484,7 @@ fn start_api_runtime_proxy( invocation_processor_handle.clone(), appsec_processor, propagator, + Arc::clone(config), ) .ok() } diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 21a018377..c8d0270dc 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -2,8 +2,12 @@ use crate::lifecycle::invocation::processor_service::InvocationProcessorHandle; use crate::lifecycle::invocation::triggers::IdentifiedTrigger; use crate::traces::propagation::DatadogCompositePropagator; use crate::{ - appsec::processor::Processor as AppSecProcessor, config::aws::AwsConfig, - extension::EXTENSION_HOST, lwa, proxy::tee_body::TeeBodyWithCompletion, + appsec::{self, DeferredProcessor as AppSecProcessor}, + config::Config, + config::aws::AwsConfig, + extension::EXTENSION_HOST, + lwa, + proxy::tee_body::TeeBodyWithCompletion, }; use axum::{ Router, @@ -32,16 +36,18 @@ type InterceptorState = ( Arc, Arc>, InvocationProcessorHandle, - Option>>, + Option, Arc, Arc>>, + Arc, ); pub fn start( aws_config: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, propagator: Arc, + config: Arc, ) -> Result> { let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref()); let shutdown_token = CancellationToken::new(); @@ -62,6 +68,7 @@ pub fn start( appsec_processor, propagator, tasks.clone(), + config, ); let tasks_clone = tasks.clone(); @@ -133,9 +140,15 @@ fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) - async fn invocation_next_proxy( Path(api_version): Path, - State((aws_config, client, invocation_processor, appsec_processor, propagator, tasks)): State< - InterceptorState, - >, + State(( + aws_config, + client, + invocation_processor, + appsec_processor, + propagator, + tasks, + config, + )): State, request: Request, ) -> Response { debug!("PROXY | invocation_next_proxy | api_version: {api_version}"); @@ -179,20 +192,23 @@ async fn invocation_next_proxy( if let Ok(body) = intercepted_completion_receiver.await { debug!("PROXY | invocation_next_proxy | intercepted body completed"); - if let Some(appsec_processor) = appsec_processor + if let Some(appsec_handle) = &appsec_processor && let Some(request_id) = intercepted_parts_clone .headers .get("Lambda-Runtime-Aws-Request-Id") .and_then(|v| v.to_str().ok()) + && let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - { - if let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - appsec_processor - .lock() - .await - .process_invocation_next(request_id, &trigger) - .await; - } + // Resolve the deferred WAF handle, awaiting its off-critical-path + // build if it has not finished yet. This is the first request + // payload to be evaluated, so it is the earliest point the WAF is + // actually needed. + if let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await { + appsec_processor + .lock() + .await + .process_invocation_next(request_id, &trigger) + .await; } } @@ -223,7 +239,7 @@ async fn invocation_next_proxy( async fn invocation_response_proxy( Path((api_version, request_id)): Path<(String, String)>, - State((aws_config, client, invocation_processor, appsec_processor, _, tasks)): State< + State((aws_config, client, invocation_processor, appsec_processor, _, tasks, config)): State< InterceptorState, >, request: Request, @@ -241,7 +257,9 @@ async fn invocation_response_proxy( join_set.spawn(async move { if let Ok(body) = outgoing_completion_receiver.await { debug!("PROXY | invocation_response_proxy | intercepted outgoing body completed"); - if let Some(appsec_processor) = appsec_processor { + if let Some(appsec_handle) = &appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await + { appsec_processor .lock() .await @@ -304,8 +322,10 @@ async fn invocation_error_proxy( request: Request, ) -> Response { debug!("PROXY | invocation_error_proxy | api_version: {api_version}, request_id: {request_id}"); - let State((_, _, _, appsec_processor, _, _)) = &state; - if let Some(appsec_processor) = appsec_processor { + let State((_, _, _, appsec_processor, _, _, config)) = &state; + if let Some(appsec_handle) = appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, config).await + { // Marking any outstanding security context as finalized by sending a blank response. appsec_processor .lock() @@ -318,7 +338,7 @@ async fn invocation_error_proxy( } async fn passthrough_proxy( - State((aws_config, client, _, _, _, _)): State, + State((aws_config, client, _, _, _, _, _)): State, request: Request, ) -> Response { let (parts, body) = request.into_parts(); @@ -434,7 +454,7 @@ where mod tests { use http_body_util::BodyExt; use std::{collections::HashMap, time::Duration}; - use tokio::{sync::Mutex as TokioMutex, time::Instant}; + use tokio::time::Instant; use dogstatsd::{aggregator::AggregatorService, metric::EMPTY_TAGS}; use http_body_util::Full; @@ -444,8 +464,8 @@ mod tests { use super::*; use crate::lifecycle::invocation::processor_service::InvocationProcessorService; use crate::{ - LAMBDA_RUNTIME_SLUG, appsec::processor::Error::FeatureDisabled as AppSecFeatureDisabled, - config::Config, tags::provider::Provider, traces::propagation::DatadogCompositePropagator, + LAMBDA_RUNTIME_SLUG, config::Config, tags::provider::Provider, + traces::propagation::DatadogCompositePropagator, }; #[tokio::test] @@ -513,22 +533,16 @@ mod tests { invocation_processor_service.run().await; }); - let appsec_processor = match AppSecProcessor::new(&config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "PROXY | aap | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; + // App & API Protection is disabled in the default config, so this + // resolves to `None` synchronously (no WAF build is spawned). + let appsec_processor = appsec::defer_processor(&config); let proxy_handle = start( aws_config, invocation_processor_handle, appsec_processor, propagator, + Arc::clone(&config), ) .expect("Failed to start API runtime proxy"); let https = HttpConnector::new(); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 37b71d29f..537302737 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -23,7 +23,7 @@ use tracing::{debug, error, warn}; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ - appsec::processor::Processor as AppSecProcessor, + appsec::DeferredProcessor as AppSecProcessor, config, http::{extract_request_body, handler_not_found}, lifecycle::invocation::{ @@ -113,7 +113,7 @@ pub struct TraceAgent { pub proxy_aggregator: Arc>, pub tags_provider: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, shutdown_token: CancellationToken, tx: Sender, stats_concentrator: StatsConcentratorHandle, @@ -137,7 +137,7 @@ impl TraceAgent { stats_processor: Arc, proxy_aggregator: Arc>, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 256d8bf14..81da39bd9 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -3,6 +3,7 @@ use crate::appsec::processor::Processor as AppSecProcessor; use crate::appsec::processor::context::HoldArguments; +use crate::appsec::{self, DeferredProcessor as AppSecDeferredProcessor}; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::lifecycle::invocation::triggers::get_default_service_name; @@ -28,7 +29,6 @@ use libdd_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollec use regex::Regex; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; @@ -498,9 +498,10 @@ impl TraceProcessor for ServerlessTraceProcessor { /// Once ready to flush, the traces are submitted to the provided [`Sender`]. #[derive(Clone)] pub struct SendingTraceProcessor { - /// The [`AppSecProcessor`] to use for security-processing the traces, if - /// configured. - pub appsec: Option>>, + /// A deferred handle to the [`AppSecProcessor`] to use for + /// security-processing the traces, if the feature is enabled. The handle is + /// resolved (awaiting the off-critical-path WAF build) at the point of use. + pub appsec: Option, /// The [`TraceProcessor`] to use for transforming raw traces into /// [`SendDataBuilderInfo`]s before flushing. pub processor: Arc, @@ -522,7 +523,13 @@ impl SendingTraceProcessor { body_size: usize, span_pointers: Option>, ) -> Result<(), SendError> { - traces = if let Some(appsec) = &self.appsec { + // Resolve the deferred AppSec handle (awaiting the off-critical-path WAF + // build if it is still in flight) only when the feature is enabled. + let appsec = match &self.appsec { + Some(handle) => appsec::resolve(handle, &config).await, + None => None, + }; + traces = if let Some(appsec) = &appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { let Some(span) = AppSecProcessor::service_entry_span_mut(&mut trace) else {