Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions bottlecap/src/appsec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,37 @@
use std::env;
use std::sync::Arc;

use tokio::sync::{Mutex, OnceCell};
use tracing::error;

use crate::appsec::processor::{Error as AppSecError, Processor};
use crate::config::Config;

pub mod processor;

/// A [`Processor`] shared across the trace agent and the runtime API proxy.
///
/// Wrapped in a [`Mutex`] (and an [`Arc`] so the value stays [`Send`]) because
/// the WAF context buffer is mutated from multiple asynchronous tasks.
pub type SharedProcessor = Arc<Mutex<Processor>>;

/// A handle to a [`SharedProcessor`] whose construction is deferred off the
/// initialization critical path.
///
/// Building the WAF (zstd-decompressing and JSON-parsing the ruleset, then
/// compiling it through `libddwaf`) costs tens of milliseconds and is only
/// needed once the first request payload is evaluated — which is strictly after
/// the first `/next`. Rather than block init on that work, consumers hold this
/// awaitable handle and resolve it (via [`resolve`]) at the point where they
/// actually need the WAF.
///
/// The outer [`Option`] (see [`defer_processor`]) distinguishes the
/// feature-disabled case (no handle at all) from the enabled case. The inner
/// [`Option`] distinguishes a successfully built processor (`Some`) from a build
/// that failed (`None`), in which case the feature is treated as a no-op exactly
/// as it would have been with the previous eager construction.
pub type DeferredProcessor = Arc<OnceCell<Option<SharedProcessor>>>;

/// Determines whether the Serverless App & API Protection features are enabled.
#[must_use]
pub const fn is_enabled(cfg: &Config) -> bool {
Expand All @@ -16,3 +44,72 @@ pub const fn is_enabled(cfg: &Config) -> bool {
pub fn is_standalone() -> bool {
env::var("DD_APM_TRACING_ENABLED").is_ok_and(|s| s.to_lowercase() == "true")
}

/// Prepares a [`DeferredProcessor`] for the App & API Protection feature without
/// blocking the caller on the (CPU-bound) WAF build.
///
/// Returns [`None`] immediately when the feature is disabled, preserving the
/// cheap, synchronous disabled-by-default path. When enabled, a background task
/// is spawned to build the processor off the critical path, and a handle that
/// resolves to it is returned right away. Consumers call [`resolve`] where they
/// use the WAF; if a request arrives before the background build has finished,
/// that call simply awaits the in-flight build (or kicks it off itself).
#[must_use]
pub fn defer_processor(cfg: &Arc<Config>) -> Option<DeferredProcessor> {
if !is_enabled(cfg) {
// Feature disabled: nothing to build, and nothing to await.
return None;
}

let cell: DeferredProcessor = Arc::new(OnceCell::new());

// Kick the build off the synchronous init path. The first consumer to call
// `resolve` will await whatever this task produces (or, if it somehow races
// ahead, run an equivalent build itself via `get_or_init`).
let background_cell = Arc::clone(&cell);
let background_cfg = Arc::clone(cfg);
tokio::spawn(async move {
let _ = background_cell
.get_or_init(|| build_processor(background_cfg))
.await;
});

Some(cell)
}

/// Resolves a [`DeferredProcessor`] to the underlying [`SharedProcessor`], if the
/// WAF was built successfully.
///
/// Awaits the background build started by [`defer_processor`] when it is still in
/// flight; if no build is running yet, it starts one (off-thread, CPU-bound work
/// runs on the blocking pool). Subsequent calls return immediately.
pub async fn resolve(handle: &DeferredProcessor, cfg: &Arc<Config>) -> Option<SharedProcessor> {
handle
.get_or_init(|| build_processor(Arc::clone(cfg)))
.await
.clone()
}

/// Builds the App & API Protection [`Processor`] on the blocking thread pool.
///
/// The WAF build is CPU-bound (ruleset decompression, JSON parsing, and WAF
/// compilation), so it is offloaded with [`tokio::task::spawn_blocking`] to keep
/// it off the async worker threads. Returns [`None`] (logging at the appropriate
/// level) when the feature is disabled or the build fails, matching the previous
/// "feature is silently a no-op" behaviour.
async fn build_processor(cfg: Arc<Config>) -> Option<SharedProcessor> {
match tokio::task::spawn_blocking(move || Processor::new(&cfg)).await {
Ok(Ok(processor)) => Some(Arc::new(Mutex::new(processor))),
Ok(Err(AppSecError::FeatureDisabled)) => None,
Ok(Err(e)) => {
error!(
"AAP | error creating App & API Protection processor, the feature will be disabled: {e}"
);
None
}
Err(e) => {
error!("AAP | App & API Protection processor build task failed to join: {e}");
None
}
}
}
30 changes: 12 additions & 18 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ static GLOBAL: Jemalloc = Jemalloc;

use bottlecap::{
DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG,
appsec::processor::{
Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor,
},
appsec::{self, DeferredProcessor as AppSecProcessor},
config::{
self, Config,
aws::{AwsConfig, build_lambda_function_arn},
Expand Down Expand Up @@ -387,17 +385,12 @@ async fn extension_loop_active(
invocation_processor_service.run().await;
});

// AppSec processor (if enabled)
let appsec_processor = match AppSecProcessor::new(config) {
Ok(p) => Some(Arc::new(TokioMutex::new(p))),
Err(AppSecFeatureDisabled) => None,
Err(e) => {
error!(
"AAP | error creating App & API Protection processor, the feature will be disabled: {e}"
);
None
}
};
// AppSec processor (if enabled). The WAF build (ruleset decompression, JSON
// parsing, and `libddwaf` compilation) is CPU-bound and only needed once the
// first request payload is evaluated, which is strictly after the first
// `/next`. Defer it off the init critical path: consumers receive an
// awaitable handle and resolve it where they actually use the WAF.
let appsec_processor = appsec::defer_processor(config);

let (
trace_agent_channel,
Expand Down Expand Up @@ -825,7 +818,7 @@ async fn extension_loop_active(
async fn wait_for_tombstone_event(
event_bus: &mut EventBus,
invocation_processor_handle: &InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
appsec_processor: Option<AppSecProcessor>,
tags_provider: Arc<TagProvider>,
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
trace_agent_channel: Sender<SendDataBuilderInfo>,
Expand Down Expand Up @@ -882,7 +875,7 @@ fn cancel_background_services(
async fn handle_event_bus_event(
event: Event,
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
appsec_processor: Option<AppSecProcessor>,
tags_provider: Arc<TagProvider>,
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
trace_agent_channel: Sender<SendDataBuilderInfo>,
Expand Down Expand Up @@ -1149,7 +1142,7 @@ fn start_trace_agent(
api_key_factory: &Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
appsec_processor: Option<AppSecProcessor>,
client: &Client,
) -> (
Sender<SendDataBuilderInfo>,
Expand Down Expand Up @@ -1477,7 +1470,7 @@ fn start_api_runtime_proxy(
config: &Arc<Config>,
aws_config: Arc<AwsConfig>,
invocation_processor_handle: &InvocationProcessorHandle,
appsec_processor: Option<&Arc<TokioMutex<AppSecProcessor>>>,
appsec_processor: Option<&AppSecProcessor>,
propagator: Arc<DatadogCompositePropagator>,
) -> Option<CancellationToken> {
if !should_start_proxy(config, Arc::clone(&aws_config)) {
Expand All @@ -1491,6 +1484,7 @@ fn start_api_runtime_proxy(
invocation_processor_handle.clone(),
appsec_processor,
propagator,
Arc::clone(config),
)
.ok()
}
Expand Down
82 changes: 48 additions & 34 deletions bottlecap/src/proxy/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ use crate::lifecycle::invocation::processor_service::InvocationProcessorHandle;
use crate::lifecycle::invocation::triggers::IdentifiedTrigger;
use crate::traces::propagation::DatadogCompositePropagator;
use crate::{
appsec::processor::Processor as AppSecProcessor, config::aws::AwsConfig,
extension::EXTENSION_HOST, lwa, proxy::tee_body::TeeBodyWithCompletion,
appsec::{self, DeferredProcessor as AppSecProcessor},
config::Config,
config::aws::AwsConfig,
extension::EXTENSION_HOST,
lwa,
proxy::tee_body::TeeBodyWithCompletion,
};
use axum::{
Router,
Expand Down Expand Up @@ -32,16 +36,18 @@ type InterceptorState = (
Arc<AwsConfig>,
Arc<Client<HttpConnector, Body>>,
InvocationProcessorHandle,
Option<Arc<Mutex<AppSecProcessor>>>,
Option<AppSecProcessor>,
Arc<DatadogCompositePropagator>,
Arc<Mutex<JoinSet<()>>>,
Arc<Config>,
);

pub fn start(
aws_config: Arc<AwsConfig>,
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<Mutex<AppSecProcessor>>>,
appsec_processor: Option<AppSecProcessor>,
propagator: Arc<DatadogCompositePropagator>,
config: Arc<Config>,
) -> Result<CancellationToken, Box<dyn std::error::Error>> {
let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref());
let shutdown_token = CancellationToken::new();
Expand All @@ -62,6 +68,7 @@ pub fn start(
appsec_processor,
propagator,
tasks.clone(),
config,
);

let tasks_clone = tasks.clone();
Expand Down Expand Up @@ -133,9 +140,15 @@ fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) -

async fn invocation_next_proxy(
Path(api_version): Path<String>,
State((aws_config, client, invocation_processor, appsec_processor, propagator, tasks)): State<
InterceptorState,
>,
State((
aws_config,
client,
invocation_processor,
appsec_processor,
propagator,
tasks,
config,
)): State<InterceptorState>,
request: Request,
) -> Response {
debug!("PROXY | invocation_next_proxy | api_version: {api_version}");
Expand Down Expand Up @@ -179,20 +192,23 @@ async fn invocation_next_proxy(
if let Ok(body) = intercepted_completion_receiver.await {
debug!("PROXY | invocation_next_proxy | intercepted body completed");

if let Some(appsec_processor) = appsec_processor
if let Some(appsec_handle) = &appsec_processor
&& let Some(request_id) = intercepted_parts_clone
.headers
.get("Lambda-Runtime-Aws-Request-Id")
.and_then(|v| v.to_str().ok())
&& let Ok(trigger) = IdentifiedTrigger::from_slice(&body)
{
{
if let Ok(trigger) = IdentifiedTrigger::from_slice(&body) {
appsec_processor
.lock()
.await
.process_invocation_next(request_id, &trigger)
.await;
}
// Resolve the deferred WAF handle, awaiting its off-critical-path
// build if it has not finished yet. This is the first request
// payload to be evaluated, so it is the earliest point the WAF is
// actually needed.
if let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await {
appsec_processor
.lock()
.await
.process_invocation_next(request_id, &trigger)
.await;
}
}

Expand Down Expand Up @@ -223,7 +239,7 @@ async fn invocation_next_proxy(

async fn invocation_response_proxy(
Path((api_version, request_id)): Path<(String, String)>,
State((aws_config, client, invocation_processor, appsec_processor, _, tasks)): State<
State((aws_config, client, invocation_processor, appsec_processor, _, tasks, config)): State<
InterceptorState,
>,
request: Request,
Expand All @@ -241,7 +257,9 @@ async fn invocation_response_proxy(
join_set.spawn(async move {
if let Ok(body) = outgoing_completion_receiver.await {
debug!("PROXY | invocation_response_proxy | intercepted outgoing body completed");
if let Some(appsec_processor) = appsec_processor {
if let Some(appsec_handle) = &appsec_processor
&& let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await
{
appsec_processor
.lock()
.await
Expand Down Expand Up @@ -304,8 +322,10 @@ async fn invocation_error_proxy(
request: Request,
) -> Response {
debug!("PROXY | invocation_error_proxy | api_version: {api_version}, request_id: {request_id}");
let State((_, _, _, appsec_processor, _, _)) = &state;
if let Some(appsec_processor) = appsec_processor {
let State((_, _, _, appsec_processor, _, _, config)) = &state;
if let Some(appsec_handle) = appsec_processor
&& let Some(appsec_processor) = appsec::resolve(appsec_handle, config).await
{
// Marking any outstanding security context as finalized by sending a blank response.
appsec_processor
.lock()
Expand All @@ -318,7 +338,7 @@ async fn invocation_error_proxy(
}

async fn passthrough_proxy(
State((aws_config, client, _, _, _, _)): State<InterceptorState>,
State((aws_config, client, _, _, _, _, _)): State<InterceptorState>,
request: Request,
) -> Response {
let (parts, body) = request.into_parts();
Expand Down Expand Up @@ -434,7 +454,7 @@ where
mod tests {
use http_body_util::BodyExt;
use std::{collections::HashMap, time::Duration};
use tokio::{sync::Mutex as TokioMutex, time::Instant};
use tokio::time::Instant;

use dogstatsd::{aggregator::AggregatorService, metric::EMPTY_TAGS};
use http_body_util::Full;
Expand All @@ -444,8 +464,8 @@ mod tests {
use super::*;
use crate::lifecycle::invocation::processor_service::InvocationProcessorService;
use crate::{
LAMBDA_RUNTIME_SLUG, appsec::processor::Error::FeatureDisabled as AppSecFeatureDisabled,
config::Config, tags::provider::Provider, traces::propagation::DatadogCompositePropagator,
LAMBDA_RUNTIME_SLUG, config::Config, tags::provider::Provider,
traces::propagation::DatadogCompositePropagator,
};

#[tokio::test]
Expand Down Expand Up @@ -513,22 +533,16 @@ mod tests {
invocation_processor_service.run().await;
});

let appsec_processor = match AppSecProcessor::new(&config) {
Ok(p) => Some(Arc::new(TokioMutex::new(p))),
Err(AppSecFeatureDisabled) => None,
Err(e) => {
error!(
"PROXY | aap | error creating App & API Protection processor, the feature will be disabled: {e}"
);
None
}
};
// App & API Protection is disabled in the default config, so this
// resolves to `None` synchronously (no WAF build is spawned).
let appsec_processor = appsec::defer_processor(&config);

let proxy_handle = start(
aws_config,
invocation_processor_handle,
appsec_processor,
propagator,
Arc::clone(&config),
)
.expect("Failed to start API runtime proxy");
let https = HttpConnector::new();
Expand Down
Loading
Loading