diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index b353820ce..b5839eaf1 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -389,7 +389,7 @@ version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ - "bitflags 2.11.0", + "bitflags", "cexpr", "clang-sys", "itertools 0.11.0", @@ -433,12 +433,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.11.0" @@ -519,13 +513,13 @@ dependencies = [ "mime", "mock_instant", "multipart", - "nix 0.26.4", + "nix", "opentelemetry-proto", - "opentelemetry-semantic-conventions 0.30.0", + "opentelemetry-semantic-conventions", "ordered_hash_map", "proptest", "prost 0.14.3", - "rand 0.8.6", + "rand 0.9.4", "regex", "reqwest", "rmp-serde", @@ -541,7 +535,7 @@ dependencies = [ "serial_test", "sha2", "tempfile", - "thiserror 1.0.69", + "thiserror 2.0.18", "tikv-jemallocator", "time", "tokio", @@ -828,7 +822,7 @@ dependencies = [ "libdd-trace-utils 2.0.2", "lru", "opentelemetry", - "opentelemetry-semantic-conventions 0.31.0", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "rand 0.8.6", "rustc_version_runtime", @@ -1923,7 +1917,7 @@ dependencies = [ "hyper 1.8.1", "hyper-util", "libc", - "nix 0.29.0", + "nix", "pin-project", "regex", "serde", @@ -1954,7 +1948,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "libc", - "nix 0.29.0", + "nix", "pin-project", "regex", "rustls", @@ -2282,7 +2276,7 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08" dependencies = [ - "bitflags 2.11.0", + "bitflags", "libc", "plain", "redox_syscall 0.7.3", @@ -2439,24 +2433,13 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "nix" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" -dependencies = [ - "bitflags 1.3.2", - "cfg-if", - "libc", -] - [[package]] name = "nix" version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.11.0", + "bitflags", "cfg-if", "cfg_aliases", "libc", @@ -2530,12 +2513,6 @@ dependencies = [ "tonic-prost", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.30.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" - [[package]] name = "opentelemetry-semantic-conventions" version = "0.31.0" @@ -2806,7 +2783,7 @@ checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bit-set 0.8.0", "bit-vec 0.8.0", - "bitflags 2.11.0", + "bitflags", "num-traits", "rand 0.9.4", "rand_chacha 0.9.0", @@ -3109,7 +3086,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.11.0", + "bitflags", ] [[package]] @@ -3118,7 +3095,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags 2.11.0", + "bitflags", ] [[package]] @@ -3296,7 +3273,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.11.0", + "bitflags", "errno", "libc", "linux-raw-sys 0.4.15", @@ -3309,7 +3286,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags 2.11.0", + "bitflags", "errno", "libc", "linux-raw-sys 0.12.1", @@ -3449,7 +3426,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags 2.11.0", + "bitflags", "core-foundation", "core-foundation-sys", "libc", @@ -4151,7 +4128,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags 2.11.0", + "bitflags", "bytes", "futures-util", "http 1.4.0", @@ -4500,7 +4477,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.11.0", + "bitflags", "hashbrown 0.15.5", "indexmap", "semver", @@ -4891,7 +4868,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.11.0", + "bitflags", "indexmap", "log", "serde", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 714afa77f..8495af513 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -22,13 +22,13 @@ lazy_static = { version = "1.5", default-features = false } log = { version = "0.4", default-features = false } mime = { version = "0.3", default-features = false } multipart = { version = "0.18", default-features = false, features = ["server"] } -nix = { version = "0.26", default-features = false, features = ["feature", "fs"] } +nix = { version = "0.29", default-features = false, features = ["feature", "fs"] } ordered_hash_map = { version = "0.4", default-features = false } regex = { version = "1.10", default-features = false } reqwest = { version = "0.12.11", features = ["json", "http2"], default-features = false } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["alloc"] } -thiserror = { version = "1.0", default-features = false } +thiserror = { version = "2.0", default-features = false } # Transitive dependency (pulled in via cookie). Pinned to >=0.3.47 so cargo audit / CI passes (RUSTSEC-2026-0009). time = { version = "0.3.47", default-features = false } tokio = { version = "1.47", default-features = false, features = ["macros", "rt-multi-thread", "time"] } @@ -46,7 +46,7 @@ rustls-webpki = { version = "0.103.13", default-features = false } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] } rustls-pki-types = { version = "1.0", default-features = false } hyper-rustls = { version = "0.27.7", default-features = false } -rand = { version = "0.8", default-features = false } +rand = { version = "0.9", default-features = false } prost = { version = "0.14", default-features = false } tonic = { version = "0.14", features = ["transport", "codegen", "server", "channel", "router"], default-features = false } tonic-types = { version = "0.14", default-features = false } @@ -55,7 +55,7 @@ futures = { version = "0.3.31", default-features = false } serde-aux = { version = "4.7", default-features = false } serde_html_form = { version = "0.2", default-features = false } opentelemetry-proto = { version = "0.31.0", features = ["trace", "with-serde", "gen-tonic"] } -opentelemetry-semantic-conventions = { version = "0.30", features = ["semconv_experimental"] } +opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } # Pinned to <0.8.3: version 0.8.3 upgraded to openssl-probe 0.2.x which scans all cert # directories and parses ~200 individual cert files on Lambda instead of loading a single # bundle file, adding ~45ms to each reqwest::Client::build() call. @@ -129,7 +129,19 @@ tikv-jemallocator = "0.5" [features] default = [ - "reqwest/rustls-tls-native-roots", + # Use webpki-roots (compiled-in Mozilla CA bundle) rather than + # rustls-tls-native-roots for the non-FIPS reqwest clients. native-roots + # calls rustls_native_certs::load_native_certs() — a filesystem cert-bundle + # scan/parse — on *every* reqwest::Client::build(), and bottlecap builds two + # reqwest clients during cold-start init (the register client in + # bin/bottlecap/main.rs and the shared flush client in src/http.rs). + # webpki-roots removes that per-build scan. Trade-off: webpki trusts only the + # public Mozilla roots, so private/internal OS-installed CAs are no longer + # picked up implicitly; users relying on those must point DD_PROXY_HTTPS at a + # trusted intercept or supply the CA via tls_cert_file (which still layers on + # top via add_root_certificate in src/http.rs). FIPS builds keep native-roots + # (see the `fips` feature) and are unaffected. + "reqwest/rustls-tls-webpki-roots", "datadog-fips/default", "datadog-agent-config/https", "libdd-common/https", diff --git a/bottlecap/src/appsec/mod.rs b/bottlecap/src/appsec/mod.rs index 18755a04c..910f6a953 100644 --- a/bottlecap/src/appsec/mod.rs +++ b/bottlecap/src/appsec/mod.rs @@ -1,9 +1,37 @@ use std::env; +use std::sync::Arc; +use tokio::sync::{Mutex, OnceCell}; +use tracing::error; + +use crate::appsec::processor::{Error as AppSecError, Processor}; use crate::config::Config; pub mod processor; +/// A [`Processor`] shared across the trace agent and the runtime API proxy. +/// +/// Wrapped in a [`Mutex`] (and an [`Arc`] so the value stays [`Send`]) because +/// the WAF context buffer is mutated from multiple asynchronous tasks. +pub type SharedProcessor = Arc>; + +/// A handle to a [`SharedProcessor`] whose construction is deferred off the +/// initialization critical path. +/// +/// Building the WAF (zstd-decompressing and JSON-parsing the ruleset, then +/// compiling it through `libddwaf`) costs tens of milliseconds and is only +/// needed once the first request payload is evaluated — which is strictly after +/// the first `/next`. Rather than block init on that work, consumers hold this +/// awaitable handle and resolve it (via [`resolve`]) at the point where they +/// actually need the WAF. +/// +/// The outer [`Option`] (see [`defer_processor`]) distinguishes the +/// feature-disabled case (no handle at all) from the enabled case. The inner +/// [`Option`] distinguishes a successfully built processor (`Some`) from a build +/// that failed (`None`), in which case the feature is treated as a no-op exactly +/// as it would have been with the previous eager construction. +pub type DeferredProcessor = Arc>>; + /// Determines whether the Serverless App & API Protection features are enabled. #[must_use] pub const fn is_enabled(cfg: &Config) -> bool { @@ -16,3 +44,72 @@ pub const fn is_enabled(cfg: &Config) -> bool { pub fn is_standalone() -> bool { env::var("DD_APM_TRACING_ENABLED").is_ok_and(|s| s.to_lowercase() == "true") } + +/// Prepares a [`DeferredProcessor`] for the App & API Protection feature without +/// blocking the caller on the (CPU-bound) WAF build. +/// +/// Returns [`None`] immediately when the feature is disabled, preserving the +/// cheap, synchronous disabled-by-default path. When enabled, a background task +/// is spawned to build the processor off the critical path, and a handle that +/// resolves to it is returned right away. Consumers call [`resolve`] where they +/// use the WAF; if a request arrives before the background build has finished, +/// that call simply awaits the in-flight build (or kicks it off itself). +#[must_use] +pub fn defer_processor(cfg: &Arc) -> Option { + if !is_enabled(cfg) { + // Feature disabled: nothing to build, and nothing to await. + return None; + } + + let cell: DeferredProcessor = Arc::new(OnceCell::new()); + + // Kick the build off the synchronous init path. The first consumer to call + // `resolve` will await whatever this task produces (or, if it somehow races + // ahead, run an equivalent build itself via `get_or_init`). + let background_cell = Arc::clone(&cell); + let background_cfg = Arc::clone(cfg); + tokio::spawn(async move { + let _ = background_cell + .get_or_init(|| build_processor(background_cfg)) + .await; + }); + + Some(cell) +} + +/// Resolves a [`DeferredProcessor`] to the underlying [`SharedProcessor`], if the +/// WAF was built successfully. +/// +/// Awaits the background build started by [`defer_processor`] when it is still in +/// flight; if no build is running yet, it starts one (off-thread, CPU-bound work +/// runs on the blocking pool). Subsequent calls return immediately. +pub async fn resolve(handle: &DeferredProcessor, cfg: &Arc) -> Option { + handle + .get_or_init(|| build_processor(Arc::clone(cfg))) + .await + .clone() +} + +/// Builds the App & API Protection [`Processor`] on the blocking thread pool. +/// +/// The WAF build is CPU-bound (ruleset decompression, JSON parsing, and WAF +/// compilation), so it is offloaded with [`tokio::task::spawn_blocking`] to keep +/// it off the async worker threads. Returns [`None`] (logging at the appropriate +/// level) when the feature is disabled or the build fails, matching the previous +/// "feature is silently a no-op" behaviour. +async fn build_processor(cfg: Arc) -> Option { + match tokio::task::spawn_blocking(move || Processor::new(&cfg)).await { + Ok(Ok(processor)) => Some(Arc::new(Mutex::new(processor))), + Ok(Err(AppSecError::FeatureDisabled)) => None, + Ok(Err(e)) => { + error!( + "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" + ); + None + } + Err(e) => { + error!("AAP | App & API Protection processor build task failed to join: {e}"); + None + } + } +} diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 39f352149..fe503fb78 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -18,9 +18,7 @@ static GLOBAL: Jemalloc = Jemalloc; use bottlecap::{ DOGSTATSD_PORT, LAMBDA_RUNTIME_SLUG, - appsec::processor::{ - Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor, - }, + appsec::{self, DeferredProcessor as AppSecProcessor}, config::{ self, Config, aws::{AwsConfig, build_lambda_function_arn}, @@ -97,6 +95,7 @@ use dogstatsd::{ }; use libdd_trace_obfuscation::obfuscation_config; use reqwest::Client; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{collections::hash_map, env, path::Path, str::FromStr, sync::Arc}; use tokio::time::Instant; use tokio::{sync::Mutex as TokioMutex, sync::mpsc::Sender}; @@ -105,8 +104,22 @@ use tracing::{debug, error, warn}; use tracing_subscriber::EnvFilter; use ustr::Ustr; -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { + // Right-size the Tokio worker pool to the Lambda memory tier instead of letting + // `#[tokio::main]` default to `available_parallelism()`. AWS scales a function's + // vCPU allotment linearly with memory, granting one full vCPU at 1769 MB, so we + // map memory -> workers with integer math (no float casts that would trip + // clippy::pedantic) and clamp to a sane 1..=4 range. Pairs with the H0 + // `available_parallelism` log so the chosen worker count can be benchmarked. + let worker_threads = tokio_worker_threads(); + tokio::runtime::Builder::new_multi_thread() + .worker_threads(worker_threads) + .enable_all() + .build()? + .block_on(run()) +} + +async fn run() -> anyhow::Result<()> { let start_time = Instant::now(); init_ustr(); enable_logging_subsystem(); @@ -114,6 +127,12 @@ async fn main() -> anyhow::Result<()> { log_fips_status(&aws_config.region); let version_without_next = EXTENSION_VERSION.split('-').next().unwrap_or("NA"); debug!("Starting Datadog Extension v{version_without_next}"); + // Tokio sizes its worker pool from available_parallelism(); logging it makes the + // value the runtime actually sees observable per memory tier (see cold-start H15). + debug!( + "INIT | available_parallelism={:?}", + std::thread::available_parallelism() + ); // Debug: Wait for debugger to attach if DD_DEBUG_WAIT_FOR_ATTACH is set if let Ok(wait_secs) = env::var("DD_DEBUG_WAIT_FOR_ATTACH") @@ -126,40 +145,65 @@ async fn main() -> anyhow::Result<()> { } prepare_client_provider()?; - let client = create_reqwest_client_builder() - .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e:?}"))? - .no_proxy() - .build() - .map_err(|e| anyhow::anyhow!("Failed to create client: {e:?}"))?; - - let cloned_client = client.clone(); + // Separate checkpoint so the next difference isolates the TLS client build from + // crypto-provider setup (the latter is non-trivial in FIPS builds). + log_init_checkpoint(start_time, "crypto_provider_ready"); + + // Build the register/`/next` reqwest client and register the extension on a + // background task. Both the TLS client build (which loads native root certs) + // and the register network round-trip then overlap with config parsing and + // the shared client build on the main task below, instead of running + // serially. The task hands back the client because the `/next` long-poll + // reuses it for the lifetime of the extension. + // + // This client is kept separate from the shared flushing client on purpose: + // the Extension API register + `/next` long-poll must NOT route through + // `DD_PROXY_HTTPS` (hence `.no_proxy()`) and must NOT carry the shared + // client's `flush_timeout` (which would abort the long-poll that blocks + // until the next invocation). Those needs conflict, so the two clients + // cannot be collapsed into one — we overlap their construction instead. let runtime_api = aws_config.runtime_api.clone(); let managed_instance_mode = aws_config.is_managed_instance_mode(); - let response = tokio::task::spawn(async move { - extension::register( - &cloned_client, + let register_handle = tokio::task::spawn(async move { + let client = tokio::task::spawn_blocking(|| { + create_reqwest_client_builder() + .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e:?}"))? + .no_proxy() + .build() + .map_err(|e| anyhow::anyhow!("Failed to create client: {e:?}")) + }) + .await + .map_err(|e| anyhow::anyhow!("Failed to join client build task: {e:?}"))??; + let response = extension::register( + &client, &runtime_api, extension::EXTENSION_NAME, managed_instance_mode, ) .await + .map_err(|e| anyhow::anyhow!("Failed to register extension: {e:?}"))?; + Ok::<_, anyhow::Error>((client, response)) }); + // First load the AWS configuration let lambda_directory: String = env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string()); let config = Arc::new(config::get_config(Path::new(&lambda_directory))); + log_init_checkpoint(start_time, "config_parse"); let aws_config = Arc::new(aws_config); // Build one shared reqwest::Client for metrics, logs, trace proxy flushing, and calls to // Datadog APIs (e.g. delegated auth). reqwest::Client is Arc-based internally, so cloning - // just increments a refcount and shares the connection pool. + // just increments a refcount and shares the connection pool. Its TLS build overlaps with + // the register client build kicked off above. let shared_client = bottlecap::http::get_client(&config); let api_key_factory = create_api_key_factory(&config, &aws_config, &shared_client); + log_init_checkpoint(start_time, "shared_client_ready"); - let r = response + let (client, r) = register_handle .await - .map_err(|e| anyhow::anyhow!("Failed to join task: {e:?}"))? - .map_err(|e| anyhow::anyhow!("Failed to register extension: {e:?}"))?; + .map_err(|e| anyhow::anyhow!("Failed to join task: {e:?}"))??; + log_init_checkpoint(start_time, "register_ready"); match extension_loop_active( Arc::clone(&aws_config), @@ -183,6 +227,30 @@ async fn main() -> anyhow::Result<()> { } } +/// Derives the Tokio worker-thread count from the Lambda memory tier exposed via +/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE` (megabytes). AWS allocates ~1 vCPU per +/// 1769 MB, so `workers = round(mem_mb / 1769)` approximates the vCPUs the +/// sandbox actually grants; the result is clamped to `1..=4`. Uses integer math +/// — `(mem_mb + 884) / 1769` is `round(mem_mb / 1769)` — to avoid float casts. +/// Falls back to 2 workers when the variable is absent or unparseable. +fn tokio_worker_threads() -> usize { + const MB_PER_VCPU: u32 = 1769; + const DEFAULT_WORKERS: usize = 2; + const MAX_WORKERS: usize = 4; + + match env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") + .ok() + .and_then(|mem| mem.trim().parse::().ok()) + { + Some(mem_mb) => { + // round(mem_mb / 1769) via integer arithmetic, then clamp to 1..=4. + let workers = (mem_mb + MB_PER_VCPU / 2) / MB_PER_VCPU; + (workers as usize).clamp(1, MAX_WORKERS) + } + None => DEFAULT_WORKERS, + } +} + // Ustr initialization can take 10+ ms. // Start it early in a separate thread so it won't become a bottleneck later when SortedTags::parse() is called. fn init_ustr() { @@ -191,6 +259,34 @@ fn init_ustr() { }); } +/// Cumulative nanoseconds (since process start) recorded at the previous init +/// checkpoint, so each checkpoint can report its own per-phase delta without the +/// reader having to subtract consecutive lines by hand. +static LAST_INIT_CHECKPOINT_NS: AtomicU64 = AtomicU64::new(0); + +/// Records a cold-start init checkpoint at `phase`, logging both `delta` (time +/// since the previous checkpoint, i.e. this phase's own cost) and `cumulative` +/// (time since process start), in milliseconds to 6 decimal places (nanosecond +/// resolution) so sub-millisecond phases are still visible. This attributes init +/// time per phase so the cold-start optimizations can be measured, not guessed. +/// +/// The bookkeeping is guarded behind a `DEBUG`-level check, so it stays +/// effectively free at the default `info` level (no clock read, no logging). +fn log_init_checkpoint(start_time: Instant, phase: &str) { + if !tracing::enabled!(tracing::Level::DEBUG) { + return; + } + let cumulative = start_time.elapsed(); + let cumulative_ns = u64::try_from(cumulative.as_nanos()).unwrap_or(u64::MAX); + let previous_ns = LAST_INIT_CHECKPOINT_NS.swap(cumulative_ns, Ordering::Relaxed); + let delta = std::time::Duration::from_nanos(cumulative_ns.saturating_sub(previous_ns)); + debug!( + "INIT | phase={phase} delta={:.6}ms cumulative={:.6}ms", + delta.as_secs_f64() * 1000.0, + cumulative.as_secs_f64() * 1000.0 + ); +} + fn enable_logging_subsystem() { let log_level = LogLevel::from_str( std::env::var("DD_LOG_LEVEL") @@ -328,71 +424,26 @@ async fn extension_loop_active( &shared_client, ) .await; + log_init_checkpoint(start_time, "dogstatsd_started"); let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config))); - // Lifecycle Invocation Processor - let (invocation_processor_handle, invocation_processor_service) = - InvocationProcessorService::new( - Arc::clone(&tags_provider), - Arc::clone(config), - Arc::clone(&aws_config), - metrics_aggregator_handle.clone(), - Arc::clone(&propagator), - durable_context_tx, - ); - tokio::spawn(async move { - invocation_processor_service.run().await; - }); - - // AppSec processor (if enabled) - let appsec_processor = match AppSecProcessor::new(config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "AAP | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; - - let ( - trace_agent_channel, - trace_flusher, - trace_processor, - stats_flusher, - proxy_flusher, - trace_agent_shutdown_token, - stats_concentrator, - trace_aggregator_handle, - ) = start_trace_agent( - config, - &api_key_factory, - &tags_provider, - invocation_processor_handle.clone(), - appsec_processor.clone(), - &shared_client, - ); - - let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy( - config, - Arc::clone(&aws_config), - &invocation_processor_handle, - appsec_processor.as_ref(), - Arc::clone(&propagator), - ); - let lifecycle_listener = - LifecycleListener::new(invocation_processor_handle.clone(), Arc::clone(&propagator)); - let lifecycle_listener_shutdown_token = lifecycle_listener.get_shutdown_token(); - // TODO(astuyve): deprioritize this task after the first request - tokio::spawn(async move { - if let Err(e) = lifecycle_listener.start().await { - error!("Error starting lifecycle listener: {e:?}"); - } - }); - - let telemetry_listener_cancel_token = setup_telemetry_client( + // The Lambda Extensions API ends the INIT phase at the first `/next` call, so the + // serialized critical path before that call directly inflates cold-start time. The + // telemetry subscription only depends on `logs_agent_channel` (already available), + // and its `subscribe` HTTP round-trip is independent of constructing the trace agent, + // AppSec, API-runtime proxy, and lifecycle listener below. We therefore issue the + // subscribe and run that remaining (synchronous) service construction concurrently + // via `tokio::join!`: the subscribe future is polled first so its network round-trip + // is in flight while construction proceeds, instead of being serialized behind it. + // + // Correctness: `setup_telemetry_client` binds the telemetry listener socket + // (synchronously, before `subscribe` returns — see `TelemetryListener::start`) prior + // to subscribing, so the listener is already accepting connections when the Telemetry + // API begins delivering events. No early `platform.initStart`/`initReport` or logs are + // dropped. The construction branch only spawns background tasks and returns handles; + // it performs no I/O that the subscribe depends on, so the two are order-independent. + let telemetry_setup = setup_telemetry_client( client, &r.extension_id, &aws_config.runtime_api, @@ -400,8 +451,88 @@ async fn extension_loop_active( event_bus_tx.clone(), config.ext.serverless_logs_enabled, aws_config.is_managed_instance_mode(), - ) - .await?; + ); + + let build_services = async { + // Lifecycle Invocation Processor + let (invocation_processor_handle, invocation_processor_service) = + InvocationProcessorService::new( + Arc::clone(&tags_provider), + Arc::clone(config), + Arc::clone(&aws_config), + metrics_aggregator_handle.clone(), + Arc::clone(&propagator), + durable_context_tx, + ); + tokio::spawn(async move { + invocation_processor_service.run().await; + }); + + // AppSec processor (if enabled). The WAF build (ruleset decompression, JSON + // parsing, and `libddwaf` compilation) is CPU-bound and only needed once the + // first request payload is evaluated, which is strictly after the first + // `/next`. Defer it off the init critical path: consumers receive an + // awaitable handle and resolve it where they actually use the WAF. + let appsec_processor = appsec::defer_processor(config); + + let trace_agent_bundle = start_trace_agent( + config, + &api_key_factory, + &tags_provider, + invocation_processor_handle.clone(), + appsec_processor.clone(), + &shared_client, + ); + log_init_checkpoint(start_time, "trace_agent_started"); + + let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy( + config, + Arc::clone(&aws_config), + &invocation_processor_handle, + appsec_processor.as_ref(), + Arc::clone(&propagator), + ); + + let lifecycle_listener = + LifecycleListener::new(invocation_processor_handle.clone(), Arc::clone(&propagator)); + let lifecycle_listener_shutdown_token = lifecycle_listener.get_shutdown_token(); + // TODO(astuyve): deprioritize this task after the first request + tokio::spawn(async move { + if let Err(e) = lifecycle_listener.start().await { + error!("Error starting lifecycle listener: {e:?}"); + } + }); + + ( + invocation_processor_handle, + appsec_processor, + trace_agent_bundle, + api_runtime_proxy_shutdown_signal, + lifecycle_listener_shutdown_token, + ) + }; + + let ( + telemetry_listener_cancel_token, + ( + invocation_processor_handle, + appsec_processor, + ( + trace_agent_channel, + trace_flusher, + trace_processor, + stats_flusher, + proxy_flusher, + trace_agent_shutdown_token, + stats_concentrator, + trace_aggregator_handle, + ), + api_runtime_proxy_shutdown_signal, + lifecycle_listener_shutdown_token, + ), + ) = tokio::join!(telemetry_setup, build_services); + let telemetry_listener_cancel_token = telemetry_listener_cancel_token?; + log_init_checkpoint(start_time, "telemetry_subscribed"); let otlp_cancel_token = start_otlp_agent( config, @@ -421,6 +552,8 @@ async fn extension_loop_active( "Datadog Next-Gen Extension ready in {:}ms", start_time.elapsed().as_millis().to_string() ); + // Terminal checkpoint so the whole init timeline is one greppable "INIT |" series. + log_init_checkpoint(start_time, "ready"); if aws_config.is_managed_instance_mode() { // Clone Arc references for the background flusher task @@ -778,7 +911,7 @@ async fn extension_loop_active( async fn wait_for_tombstone_event( event_bus: &mut EventBus, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -835,7 +968,7 @@ fn cancel_background_services( async fn handle_event_bus_event( event: Event, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, trace_processor: Arc, trace_agent_channel: Sender, @@ -1102,7 +1235,7 @@ fn start_trace_agent( api_key_factory: &Arc, tags_provider: &Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, client: &Client, ) -> ( Sender, @@ -1370,7 +1503,7 @@ async fn setup_telemetry_client( let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx, event_bus_tx); let cancel_token = listener.cancel_token(); - match listener.start() { + match listener.start().await { Ok(()) => { // Drop the listener, so event_bus_tx is closed drop(listener); @@ -1430,7 +1563,7 @@ fn start_api_runtime_proxy( config: &Arc, aws_config: Arc, invocation_processor_handle: &InvocationProcessorHandle, - appsec_processor: Option<&Arc>>, + appsec_processor: Option<&AppSecProcessor>, propagator: Arc, ) -> Option { if !should_start_proxy(config, Arc::clone(&aws_config)) { @@ -1444,6 +1577,7 @@ fn start_api_runtime_proxy( invocation_processor_handle.clone(), appsec_processor, propagator, + Arc::clone(config), ) .ok() } diff --git a/bottlecap/src/extension/telemetry/listener.rs b/bottlecap/src/extension/telemetry/listener.rs index 68349374f..f06d64404 100644 --- a/bottlecap/src/extension/telemetry/listener.rs +++ b/bottlecap/src/extension/telemetry/listener.rs @@ -49,17 +49,27 @@ impl TelemetryListener { self.cancel_token.clone() } - pub fn start(&self) -> Result<(), Box> { + /// Binds the telemetry listener socket and starts serving in a background task. + /// + /// The bind happens synchronously (before this method returns) so the socket is + /// already accepting connections by the time the caller subscribes to the Lambda + /// Telemetry API. This makes listener readiness deterministic and avoids dropping + /// early `platform.initStart`/`initReport` events delivered immediately after + /// subscription. + /// + /// # Errors + /// + /// Returns an error if the listener socket cannot be bound. + pub async fn start(&self) -> Result<(), Box> { let socket = SocketAddr::from((self.host, self.port)); let router = self.make_router(); + let listener = TcpListener::bind(&socket).await?; + debug!("TELEMETRY API | Starting listener on {}", socket); + let cancel_token_clone = self.cancel_token(); let event_bus_tx = self.event_bus_tx.clone(); tokio::spawn(async move { - let listener = TcpListener::bind(&socket) - .await - .expect("Failed to bind socket"); - debug!("TELEMETRY API | Starting listener on {}", socket); axum::serve(listener, router) .with_graceful_shutdown(Self::graceful_shutdown(cancel_token_clone, event_bus_tx)) .await diff --git a/bottlecap/src/lifecycle/invocation/mod.rs b/bottlecap/src/lifecycle/invocation/mod.rs index 366883966..e98d12f20 100644 --- a/bottlecap/src/lifecycle/invocation/mod.rs +++ b/bottlecap/src/lifecycle/invocation/mod.rs @@ -1,6 +1,7 @@ use base64::{DecodeError, Engine, engine::general_purpose}; use libdd_trace_protobuf::pb::Span; -use rand::{Rng, RngCore, rngs::OsRng}; +use rand::rngs::OsRng; +use rand::{Rng, TryRngCore}; use std::collections::HashMap; use crate::tags::lambda::tags::{INIT_TYPE, SNAP_START_VALUE}; @@ -51,11 +52,16 @@ fn create_empty_span(name: String, resource: &str, service: &str) -> Span { #[must_use] pub fn generate_span_id() -> u64 { if std::env::var(INIT_TYPE).is_ok_and(|it| it == SNAP_START_VALUE) { - return OsRng.next_u64(); + // SnapStart restores from a snapshot, so seed directly from OS entropy to + // avoid reusing the snapshotted RNG state. Fall back to the thread RNG on + // the (effectively impossible) OS RNG failure rather than panicking. + if let Ok(id) = OsRng.try_next_u64() { + return id; + } } - let mut rng = rand::thread_rng(); - rng.r#gen() + let mut rng = rand::rng(); + rng.random() } fn redact_value(key: &str, value: String) -> String { diff --git a/bottlecap/src/proc/mod.rs b/bottlecap/src/proc/mod.rs index 30dc43e5b..53dca9386 100644 --- a/bottlecap/src/proc/mod.rs +++ b/bottlecap/src/proc/mod.rs @@ -5,6 +5,7 @@ use std::{ collections::HashMap, fs::{self, File}, io::{self, BufRead}, + sync::LazyLock, }; use constants::{ @@ -14,6 +15,14 @@ use constants::{ use regex::Regex; use tracing::debug; +// Compiled once on first use rather than on every call. Both patterns capture +// the soft limit value (the first numeric value after the title) from a +// process `limits` file. +static MAX_OPEN_FILES_RE: LazyLock = + LazyLock::new(|| Regex::new(r"^Max open files\s+(\d+)").expect("valid static regex")); +static MAX_PROCESSES_RE: LazyLock = + LazyLock::new(|| Regex::new(r"^Max processes\s+(\d+)").expect("valid static regex")); + #[must_use] pub fn get_pid_list() -> Vec { get_pid_list_from_path(PROC_PATH) @@ -222,8 +231,7 @@ pub fn get_fd_max_data(pids: &[i64]) -> f64 { fn get_fd_max_data_from_path(path: &str, pids: &[i64]) -> f64 { let mut fd_max = constants::LAMBDA_FILE_DESCRIPTORS_DEFAULT_LIMIT; - // regex to capture the soft limit value (first numeric value after the title) - let re = Regex::new(r"^Max open files\s+(\d+)").expect("Failed to create regex"); + let re = &*MAX_OPEN_FILES_RE; for &pid in pids { let limits_path = format!("{path}/{pid}/limits"); @@ -273,8 +281,7 @@ pub fn get_threads_max_data(pids: &[i64]) -> f64 { fn get_threads_max_data_from_path(path: &str, pids: &[i64]) -> f64 { let mut threads_max = constants::LAMBDA_EXECUTION_PROCESSES_DEFAULT_LIMIT; - // regex to capture the soft limit value (first numeric value after the title) - let re = Regex::new(r"^Max processes\s+(\d+)").expect("Failed to create regex"); + let re = &*MAX_PROCESSES_RE; for &pid in pids { let limits_path = format!("{path}/{pid}/limits"); diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index 21a018377..c8d0270dc 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -2,8 +2,12 @@ use crate::lifecycle::invocation::processor_service::InvocationProcessorHandle; use crate::lifecycle::invocation::triggers::IdentifiedTrigger; use crate::traces::propagation::DatadogCompositePropagator; use crate::{ - appsec::processor::Processor as AppSecProcessor, config::aws::AwsConfig, - extension::EXTENSION_HOST, lwa, proxy::tee_body::TeeBodyWithCompletion, + appsec::{self, DeferredProcessor as AppSecProcessor}, + config::Config, + config::aws::AwsConfig, + extension::EXTENSION_HOST, + lwa, + proxy::tee_body::TeeBodyWithCompletion, }; use axum::{ Router, @@ -32,16 +36,18 @@ type InterceptorState = ( Arc, Arc>, InvocationProcessorHandle, - Option>>, + Option, Arc, Arc>>, + Arc, ); pub fn start( aws_config: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, propagator: Arc, + config: Arc, ) -> Result> { let socket = get_proxy_socket_address(aws_config.aws_lwa_proxy_lambda_runtime_api.as_ref()); let shutdown_token = CancellationToken::new(); @@ -62,6 +68,7 @@ pub fn start( appsec_processor, propagator, tasks.clone(), + config, ); let tasks_clone = tasks.clone(); @@ -133,9 +140,15 @@ fn get_proxy_socket_address(aws_lwa_proxy_lambda_runtime_api: Option<&String>) - async fn invocation_next_proxy( Path(api_version): Path, - State((aws_config, client, invocation_processor, appsec_processor, propagator, tasks)): State< - InterceptorState, - >, + State(( + aws_config, + client, + invocation_processor, + appsec_processor, + propagator, + tasks, + config, + )): State, request: Request, ) -> Response { debug!("PROXY | invocation_next_proxy | api_version: {api_version}"); @@ -179,20 +192,23 @@ async fn invocation_next_proxy( if let Ok(body) = intercepted_completion_receiver.await { debug!("PROXY | invocation_next_proxy | intercepted body completed"); - if let Some(appsec_processor) = appsec_processor + if let Some(appsec_handle) = &appsec_processor && let Some(request_id) = intercepted_parts_clone .headers .get("Lambda-Runtime-Aws-Request-Id") .and_then(|v| v.to_str().ok()) + && let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - { - if let Ok(trigger) = IdentifiedTrigger::from_slice(&body) { - appsec_processor - .lock() - .await - .process_invocation_next(request_id, &trigger) - .await; - } + // Resolve the deferred WAF handle, awaiting its off-critical-path + // build if it has not finished yet. This is the first request + // payload to be evaluated, so it is the earliest point the WAF is + // actually needed. + if let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await { + appsec_processor + .lock() + .await + .process_invocation_next(request_id, &trigger) + .await; } } @@ -223,7 +239,7 @@ async fn invocation_next_proxy( async fn invocation_response_proxy( Path((api_version, request_id)): Path<(String, String)>, - State((aws_config, client, invocation_processor, appsec_processor, _, tasks)): State< + State((aws_config, client, invocation_processor, appsec_processor, _, tasks, config)): State< InterceptorState, >, request: Request, @@ -241,7 +257,9 @@ async fn invocation_response_proxy( join_set.spawn(async move { if let Ok(body) = outgoing_completion_receiver.await { debug!("PROXY | invocation_response_proxy | intercepted outgoing body completed"); - if let Some(appsec_processor) = appsec_processor { + if let Some(appsec_handle) = &appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, &config).await + { appsec_processor .lock() .await @@ -304,8 +322,10 @@ async fn invocation_error_proxy( request: Request, ) -> Response { debug!("PROXY | invocation_error_proxy | api_version: {api_version}, request_id: {request_id}"); - let State((_, _, _, appsec_processor, _, _)) = &state; - if let Some(appsec_processor) = appsec_processor { + let State((_, _, _, appsec_processor, _, _, config)) = &state; + if let Some(appsec_handle) = appsec_processor + && let Some(appsec_processor) = appsec::resolve(appsec_handle, config).await + { // Marking any outstanding security context as finalized by sending a blank response. appsec_processor .lock() @@ -318,7 +338,7 @@ async fn invocation_error_proxy( } async fn passthrough_proxy( - State((aws_config, client, _, _, _, _)): State, + State((aws_config, client, _, _, _, _, _)): State, request: Request, ) -> Response { let (parts, body) = request.into_parts(); @@ -434,7 +454,7 @@ where mod tests { use http_body_util::BodyExt; use std::{collections::HashMap, time::Duration}; - use tokio::{sync::Mutex as TokioMutex, time::Instant}; + use tokio::time::Instant; use dogstatsd::{aggregator::AggregatorService, metric::EMPTY_TAGS}; use http_body_util::Full; @@ -444,8 +464,8 @@ mod tests { use super::*; use crate::lifecycle::invocation::processor_service::InvocationProcessorService; use crate::{ - LAMBDA_RUNTIME_SLUG, appsec::processor::Error::FeatureDisabled as AppSecFeatureDisabled, - config::Config, tags::provider::Provider, traces::propagation::DatadogCompositePropagator, + LAMBDA_RUNTIME_SLUG, config::Config, tags::provider::Provider, + traces::propagation::DatadogCompositePropagator, }; #[tokio::test] @@ -513,22 +533,16 @@ mod tests { invocation_processor_service.run().await; }); - let appsec_processor = match AppSecProcessor::new(&config) { - Ok(p) => Some(Arc::new(TokioMutex::new(p))), - Err(AppSecFeatureDisabled) => None, - Err(e) => { - error!( - "PROXY | aap | error creating App & API Protection processor, the feature will be disabled: {e}" - ); - None - } - }; + // App & API Protection is disabled in the default config, so this + // resolves to `None` synchronously (no WAF build is spawned). + let appsec_processor = appsec::defer_processor(&config); let proxy_handle = start( aws_config, invocation_processor_handle, appsec_processor, propagator, + Arc::clone(&config), ) .expect("Failed to start API runtime proxy"); let https = HttpConnector::new(); diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index b80574944..7143736b1 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -65,6 +65,12 @@ const RESOURCE_KEY: &str = "resource"; #[derive(Debug, Clone)] pub struct Lambda { tags_map: HashMap, + // Cached representations computed once at construction so that the hot + // getters below are O(1) reads instead of re-iterating `tags_map` and + // re-running `format!("{k}:{v}")`/`join(",")` on every call. + tags_vec: Vec, + tags_string: String, + function_tags_map: HashMap, } fn arch_to_platform<'a>() -> &'a str { @@ -227,17 +233,29 @@ impl Lambda { config: Arc, metadata: &HashMap, ) -> Self { + let tags_map = tags_from_env(HashMap::new(), config, metadata); + // Compute the derived representations once. The tag set is immutable + // after construction, so callers can read these cached values cheaply. + let tags_vec: Vec = tags_map.iter().map(|(k, v)| format!("{k}:{v}")).collect(); + let tags_string = tags_vec.join(","); + let function_tags_map = + HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags_string.clone())]); Lambda { - tags_map: tags_from_env(HashMap::new(), config, metadata), + tags_map, + tags_vec, + tags_string, + function_tags_map, } } #[must_use] pub fn get_tags_vec(&self) -> Vec { - self.tags_map - .iter() - .map(|(k, v)| format!("{k}:{v}")) - .collect() + self.tags_vec.clone() + } + + #[must_use] + pub fn get_tags_string(&self) -> &str { + &self.tags_string } #[must_use] @@ -257,13 +275,7 @@ impl Lambda { #[must_use] pub fn get_function_tags_map(&self) -> HashMap { - let tags = self - .tags_map - .iter() - .map(|(k, v)| format!("{k}:{v}")) - .collect::>() - .join(","); - HashMap::from_iter([(FUNCTION_TAGS_KEY.to_string(), tags)]) + self.function_tags_map.clone() } } diff --git a/bottlecap/src/tags/provider.rs b/bottlecap/src/tags/provider.rs index e85609967..dbc2d26e3 100644 --- a/bottlecap/src/tags/provider.rs +++ b/bottlecap/src/tags/provider.rs @@ -39,7 +39,7 @@ impl Provider { #[must_use] pub fn get_tags_string(&self) -> String { - self.get_tags_vec().join(",") + self.tag_provider.get_tags_string().to_string() } #[must_use] @@ -65,6 +65,7 @@ impl Provider { trait GetTags { fn get_tags_vec(&self) -> Vec; + fn get_tags_string(&self) -> &str; fn get_canonical_id(&self) -> Option; fn get_canonical_resource_name(&self) -> Option; fn get_tags_map(&self) -> &hash_map::HashMap; @@ -78,6 +79,12 @@ impl GetTags for TagProvider { } } + fn get_tags_string(&self) -> &str { + match self { + TagProvider::Lambda(lambda_tags) => lambda_tags.get_tags_string(), + } + } + fn get_canonical_id(&self) -> Option { match self { TagProvider::Lambda(lambda_tags) => lambda_tags.get_function_arn().cloned(), diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 37b71d29f..537302737 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -23,7 +23,7 @@ use tracing::{debug, error, warn}; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ - appsec::processor::Processor as AppSecProcessor, + appsec::DeferredProcessor as AppSecProcessor, config, http::{extract_request_body, handler_not_found}, lifecycle::invocation::{ @@ -113,7 +113,7 @@ pub struct TraceAgent { pub proxy_aggregator: Arc>, pub tags_provider: Arc, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, shutdown_token: CancellationToken, tx: Sender, stats_concentrator: StatsConcentratorHandle, @@ -137,7 +137,7 @@ impl TraceAgent { stats_processor: Arc, proxy_aggregator: Arc>, invocation_processor_handle: InvocationProcessorHandle, - appsec_processor: Option>>, + appsec_processor: Option, tags_provider: Arc, stats_concentrator: StatsConcentratorHandle, span_deduper: DedupHandle, diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 256d8bf14..81da39bd9 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -3,6 +3,7 @@ use crate::appsec::processor::Processor as AppSecProcessor; use crate::appsec::processor::context::HoldArguments; +use crate::appsec::{self, DeferredProcessor as AppSecDeferredProcessor}; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::lifecycle::invocation::triggers::get_default_service_name; @@ -28,7 +29,6 @@ use libdd_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollec use regex::Regex; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; @@ -498,9 +498,10 @@ impl TraceProcessor for ServerlessTraceProcessor { /// Once ready to flush, the traces are submitted to the provided [`Sender`]. #[derive(Clone)] pub struct SendingTraceProcessor { - /// The [`AppSecProcessor`] to use for security-processing the traces, if - /// configured. - pub appsec: Option>>, + /// A deferred handle to the [`AppSecProcessor`] to use for + /// security-processing the traces, if the feature is enabled. The handle is + /// resolved (awaiting the off-critical-path WAF build) at the point of use. + pub appsec: Option, /// The [`TraceProcessor`] to use for transforming raw traces into /// [`SendDataBuilderInfo`]s before flushing. pub processor: Arc, @@ -522,7 +523,13 @@ impl SendingTraceProcessor { body_size: usize, span_pointers: Option>, ) -> Result<(), SendError> { - traces = if let Some(appsec) = &self.appsec { + // Resolve the deferred AppSec handle (awaiting the off-critical-path WAF + // build if it is still in flight) only when the feature is enabled. + let appsec = match &self.appsec { + Some(handle) => appsec::resolve(handle, &config).await, + None => None, + }; + traces = if let Some(appsec) = &appsec { let mut appsec = appsec.lock().await; traces.into_iter().filter_map(|mut trace| { let Some(span) = AppSecProcessor::service_entry_span_mut(&mut trace) else { diff --git a/images/Dockerfile.bottlecap.alpine.compile b/images/Dockerfile.bottlecap.alpine.compile index 9858c5662..52cdcc3f7 100644 --- a/images/Dockerfile.bottlecap.alpine.compile +++ b/images/Dockerfile.bottlecap.alpine.compile @@ -17,13 +17,17 @@ RUN set -euo pipefail && \ curl https://sh.rustup.rs -sSf | \ sh -s -- --profile minimal \ --default-host "${PLATFORM}-unknown-linux-musl" \ - --default-toolchain "stable-${PLATFORM}-unknown-linux-musl" \ - --component rust-src \ + --default-toolchain none \ -y ENV PATH="${PATH}:/root/.cargo/bin" # Build the binary ENV RUSTFLAGS="-Cpanic=abort" +# tikv-jemallocator uses the `_rjem_` symbol prefix, so the runtime MALLOC_CONF +# env var is ignored. Tune jemalloc at compile time instead: a single arena +# reduces metadata mapped at init and lowers RSS (the extension is not +# allocation-throughput-bound, so arena contention is a non-issue here). +ENV JEMALLOC_SYS_WITH_MALLOC_CONF="narenas:1" WORKDIR /tmp/dd/bottlecap RUN --mount=type=cache,target=/root/.cargo/git \ @@ -38,16 +42,21 @@ RUN --mount=type=cache,target=/root/.cargo/git \ else \ export FEATURES=default; \ fi; \ + # Tune codegen for the known Lambda CPUs: arm64 is Graviton2 (neoverse-n1), + # x86_64 uses the universally-safe x86-64-v2 baseline (NOT v3, which risks SIGILL). + if [ "${PLATFORM}" = "aarch64" ]; then TARGET_CPU="neoverse-n1"; else TARGET_CPU="x86-64-v2"; fi; \ if [ "${PLATFORM}" = "x86_64" ]; then \ # The `libddwaf` crate links against static objects that require `libclang_rt.builtins`, but # this is not presented to the linker by default on this platform, so we force it in. - export RUSTFLAGS="${RUSTFLAGS:-} -Clinker=clang -L$(dirname $(clang --print-file-name="libclang_rt.builtins-$(uname -m).a")) -lclang_rt.builtins-$(uname -m)"; \ + export RUSTFLAGS="${RUSTFLAGS:-} -Clinker=clang -L$(dirname $(clang --print-file-name="libclang_rt.builtins-$(uname -m).a")) -lclang_rt.builtins-$(uname -m) -Clink-arg=-Wl,-z,now -Clink-arg=-Wl,-z,relro -Ctarget-cpu=${TARGET_CPU}"; \ + else \ + export RUSTFLAGS="${RUSTFLAGS:-} -Clink-arg=-Wl,-z,now -Clink-arg=-Wl,-z,relro -Ctarget-cpu=${TARGET_CPU}"; \ fi; \ # We use a wrapper to allow `libddwaf-sys`' build.rs to be compiled with # -Ctarget-feature=-crt-static so that it is capable of dynamically loading # libclang; while still building bottlecap with a static CRT. RUSTC_WRAPPER=/tmp/dd/.cargo/musl.rustc-wrapper \ - cargo +stable build --verbose --locked --no-default-features \ + cargo build --verbose --locked --no-default-features \ --features="${FEATURES}" \ --profile="${PROFILE:-release}" && \ mkdir -p /tmp/out && \ diff --git a/images/Dockerfile.bottlecap.compile b/images/Dockerfile.bottlecap.compile index 12a23f64b..58d921f8f 100644 --- a/images/Dockerfile.bottlecap.compile +++ b/images/Dockerfile.bottlecap.compile @@ -16,13 +16,17 @@ RUN chmod +x /install-protoc.sh && /install-protoc.sh RUN curl https://sh.rustup.rs -sSf | \ sh -s -- --profile minimal \ --default-host "${PLATFORM}-unknown-linux-gnu" \ - --default-toolchain "stable-${PLATFORM}-unknown-linux-gnu" \ - --component rust-src \ + --default-toolchain none \ -y ENV PATH="${PATH}:/root/.cargo/bin" # Build the binary ENV RUSTFLAGS="-Cpanic=abort" +# tikv-jemallocator uses the `_rjem_` symbol prefix, so the runtime MALLOC_CONF +# env var is ignored. Tune jemalloc at compile time instead: a single arena +# reduces metadata mapped at init and lowers RSS (the extension is not +# allocation-throughput-bound, so arena contention is a non-issue here). +ENV JEMALLOC_SYS_WITH_MALLOC_CONF="narenas:1" ENV AWS_LC_FIPS_SYS_CC=clang ENV AWS_LC_FIPS_SYS_CXX=clang++ @@ -43,10 +47,13 @@ RUN --mount=type=cache,target=/usr/local/cargo/git \ export BUILD_MODE=debug; \ export BUILD_FLAG=""; \ fi; \ + # Tune codegen for the known Lambda CPUs: arm64 is Graviton2 (neoverse-n1), + # x86_64 uses the universally-safe x86-64-v2 baseline (NOT v3, which risks SIGILL). + if [ "${PLATFORM}" = "aarch64" ]; then TARGET_CPU="neoverse-n1"; else TARGET_CPU="x86-64-v2"; fi; \ # The `libddwaf` crate links against static objects that require `libclang_rt.builtins`, but # this is not presented to the linker by default on this platform, so we force it in. - export RUSTFLAGS="${RUSTFLAGS:-} -Clinker=clang -L$(dirname $(clang --print-file-name="libclang_rt.builtins-$(uname -m).a")) -lclang_rt.builtins-$(uname -m)"; \ - cargo +stable build --verbose --locked --no-default-features --features="${FEATURES}" ${BUILD_FLAG} && \ + export RUSTFLAGS="${RUSTFLAGS:-} -Clinker=clang -L$(dirname $(clang --print-file-name="libclang_rt.builtins-$(uname -m).a")) -lclang_rt.builtins-$(uname -m) -Clink-arg=-Wl,-z,now -Clink-arg=-Wl,-z,relro -Ctarget-cpu=${TARGET_CPU}"; \ + cargo build --verbose --locked --no-default-features --features="${FEATURES}" ${BUILD_FLAG} && \ mkdir -p /tmp/out && cp "/tmp/dd/bottlecap/target/${BUILD_MODE}/bottlecap" /tmp/out/bottlecap # Use smallest image possible diff --git a/images/Dockerfile.build_layer b/images/Dockerfile.build_layer index c931f7ddf..e08e6616d 100644 --- a/images/Dockerfile.build_layer +++ b/images/Dockerfile.build_layer @@ -5,19 +5,6 @@ ARG FILE_SUFFIX # Install dependencies RUN apt-get update && apt-get install -y zip binutils wget tar xz-utils -# UPX installation directly from GitHub -ENV UPX_VERSION=5.0.0 -RUN ARCH=$(uname -m) && \ - if [ "$ARCH" = "x86_64" ]; then \ - ARCH_NAME="amd64"; \ - elif [ "$ARCH" = "aarch64" ]; then \ - ARCH_NAME="arm64"; \ - fi && \ - wget https://github.com/upx/upx/releases/download/v${UPX_VERSION}/upx-${UPX_VERSION}-${ARCH_NAME}_linux.tar.xz && \ - tar -xf upx-${UPX_VERSION}-${ARCH_NAME}_linux.tar.xz && \ - mv upx-${UPX_VERSION}-${ARCH_NAME}_linux/upx /usr/local/bin/ && \ - rm -rf upx-${UPX_VERSION}-${ARCH_NAME}_linux upx-${UPX_VERSION}-${ARCH_NAME}_linux.tar.xz - RUN mkdir /extensions WORKDIR /extensions