Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
798577f
feat: add support for automatic DSM context extraction
jeastham1993 Jun 18, 2026
f320471
chore: cleanup files
jeastham1993 Jun 18, 2026
579f6e6
Merge branch 'main' of github.com:DataDog/datadog-lambda-extension
jeastham1993 Jun 18, 2026
03c926d
fix: revert extension version
jeastham1993 Jun 18, 2026
e6135ed
chore: log DSM serialization failures
jeastham1993 Jun 18, 2026
13043c4
chore: PR feedback
jeastham1993 Jun 18, 2026
947380a
feat: add batch iteration for DSM checkpointing
jeastham1993 Jun 18, 2026
7a69746
chore: clippy fix
jeastham1993 Jun 18, 2026
ca5b7da
feat: add MSK support for DSM extraction
jeastham1993 Jun 18, 2026
9b923a5
chore: update event bridge naming logic
jeastham1993 Jun 18, 2026
c5dc415
feat: add DSM Java integration test
jeastham1993 Jun 19, 2026
7d710d7
Merge remote-tracking branch 'origin/main' into feat/extension-dsm-su…
jeastham1993 Jun 19, 2026
c27c642
feat: add payload size parsing
jeastham1993 Jun 19, 2026
e8f72a2
chore: clippy fix
jeastham1993 Jun 19, 2026
9d7b2df
chore: clippy fix
jeastham1993 Jun 19, 2026
eb34dad
chore: cargo fmt
jeastham1993 Jun 19, 2026
84efbfc
feat: update config flag after checking tracers
jeastham1993 Jun 19, 2026
f99e4b3
chore: cargo fmt
jeastham1993 Jun 24, 2026
aa65e38
chore: update comment on DD DSM enabled:
jeastham1993 Jun 25, 2026
6d5b41f
fix: resolve MSK legacy context parsing
jeastham1993 Jun 25, 2026
42bd8bd
feat: add requestID checking on process
jeastham1993 Jun 25, 2026
aa7b01f
chore: move DSM setup behind flag
jeastham1993 Jun 25, 2026
5b0a3ea
chore: update comment on DD skect usage
jeastham1993 Jun 25, 2026
f7f8028
feat: update support for LMI
jeastham1993 Jun 25, 2026
745c6f5
chore: clippy fix
jeastham1993 Jun 25, 2026
9112d74
chore: update DSM handling
jeastham1993 Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/datasources/test-suites.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ test_suites:
- name: oom
- name: lmi-oom
- name: payload-size
- name: dsm
1 change: 1 addition & 0 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ regex = { version = "1.10", default-features = false }
reqwest = { version = "0.12.11", features = ["json", "http2"], default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_bytes = { version = "0.11", default-features = false, features = ["std"] }
# DSM pipeline-stats serialization (msgpack + gzip) for extension-side checkpoints.
rmp-serde = { version = "1.3.1", default-features = false }
flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] }
thiserror = { version = "1.0", default-features = false }
# Transitive dependency (pulled in via cookie). Pinned to >=0.3.47 so cargo audit / CI passes (RUSTSEC-2026-0009).
time = { version = "0.3.47", default-features = false }
Expand Down Expand Up @@ -95,9 +99,6 @@ tower = { version = "0.5", features = ["util"] }
mock_instant = "0.6"
serial_test = "3.1"
tempfile = "3.20"
# fake-intake test harness: decode msgpack+gzip stats payloads on arrival
rmp-serde = { version = "1.3.1", default-features = false }
flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] }

[build-dependencies]
# No external dependencies needed for the build script
Expand Down
55 changes: 54 additions & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,54 @@ async fn extension_loop_active(
.await;

let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config)));

// Shared proxy aggregator (used by the trace agent's proxy endpoints and,
// when enabled, the extension-side DSM processor).
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));

// Extension-side Data Streams Monitoring (consume checkpoints), gated by
// DD_DATA_STREAMS_ENABLED.
let dsm_processor = if config.ext.dsm_consume_enabled {
let dsm_service = config
.service
.clone()
.or_else(|| tags_provider.get_canonical_resource_name())
.unwrap_or_else(|| "aws.lambda".to_string())
.to_lowercase();
let dsm_env = config.env.clone().unwrap_or_default();
let dsm_version = config.version.clone().unwrap_or_default();
let mut dsm_tags: Vec<String> = config
.tags
.iter()
.map(|(key, value)| format!("{key}:{value}"))
.collect();
dsm_tags.sort();

debug!(
"DSM startup config: enabled={}, service={}, env={}, version={}, site={}, tags={:?}",
config.ext.dsm_consume_enabled,
dsm_service,
dsm_env,
dsm_version,
config.site,
dsm_tags
);

Some(Arc::new(
bottlecap::traces::data_streams::DsmProcessor::new(
dsm_service,
dsm_env,
EXTENSION_VERSION.to_string(),
dsm_version,
dsm_tags,
&config.site,
Arc::clone(&proxy_aggregator),
),
))
} else {
None
};

// Lifecycle Invocation Processor
let (invocation_processor_handle, invocation_processor_service) =
InvocationProcessorService::new(
Expand All @@ -339,6 +387,7 @@ async fn extension_loop_active(
metrics_aggregator_handle.clone(),
Arc::clone(&propagator),
durable_context_tx,
dsm_processor.clone(),
);
tokio::spawn(async move {
invocation_processor_service.run().await;
Expand Down Expand Up @@ -372,6 +421,7 @@ async fn extension_loop_active(
invocation_processor_handle.clone(),
appsec_processor.clone(),
&shared_client,
Arc::clone(&proxy_aggregator),
);

let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy(
Expand Down Expand Up @@ -430,6 +480,7 @@ async fn extension_loop_active(
let stats_flusher_clone = Arc::clone(&stats_flusher);
let proxy_flusher_clone = proxy_flusher.clone();
let metrics_aggr_handle_clone = metrics_aggregator_handle.clone();
let dsm_processor_clone = dsm_processor.clone();

// In Managed Instance mode, create a separate interval for the background flusher task.
// We don't reuse race_flush_interval because we need to configure the missed tick
Expand Down Expand Up @@ -460,6 +511,7 @@ async fn extension_loop_active(
proxy_flusher_clone,
metrics_flushers_clone,
metrics_aggr_handle_clone,
dsm_processor_clone,
);

loop {
Expand Down Expand Up @@ -634,6 +686,7 @@ async fn extension_loop_active(
proxy_flusher.clone(),
Arc::clone(&metrics_flushers),
metrics_aggregator_handle.clone(),
dsm_processor.clone(),
);
handle_next_invocation(next_lambda_response, &invocation_processor_handle).await;
loop {
Expand Down Expand Up @@ -1104,6 +1157,7 @@ fn start_trace_agent(
invocation_processor_handle: InvocationProcessorHandle,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
client: &Client,
proxy_aggregator: Arc<TokioMutex<proxy_aggregator::Aggregator>>,
) -> (
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::TraceFlusher>,
Expand Down Expand Up @@ -1168,7 +1222,6 @@ fn start_trace_agent(
tokio::spawn(span_dedup_service.run());

// Proxy
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));
let proxy_flusher = Arc::new(ProxyFlusher::new(
api_key_factory.clone(),
Arc::clone(&proxy_aggregator),
Expand Down
58 changes: 57 additions & 1 deletion bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ pub struct LambdaConfig {
/// without durable execution context enrichment. Defaults to 0 until the tracer-side
/// durable execution support is released; set to 50 to re-enable enrichment.
pub lambda_durable_function_log_buffer_size: usize,

// Data Streams Monitoring
/// Enable extension-side DSM consume checkpoints. Gated by the same
/// `DD_DATA_STREAMS_ENABLED` flag the tracer libraries use; the extension
/// and tracer never emit checkpoints for the same runtime, so sharing the
/// flag cannot double-count.
Comment thread
jeastham1993 marked this conversation as resolved.
/// Java/.NET/Go - Datadog Lambda supports calls to '/start-invocation', no tracer support for parsing Lambda payloads
/// Python - Wrapper script in datadog-lambda-python extracts context and DSM, but does not call `/start-invocation`
/// JS - Wrapper script in datadog-lambda-js extracts context and DSM, but does not call `/start-invocation`
pub dsm_consume_enabled: bool,
/// Fallback DSM `exchange` (event bus name) used for `EventBridge` consume
/// checkpoints when it cannot be derived from the event payload
/// (`DD_DSM_EXCHANGE_NAME`).
pub dsm_exchange_name: Option<String>,
/// Consumer group used for `MSK`/Kafka DSM consume checkpoints, which is not
/// present in the Lambda event payload (`DD_DSM_KAFKA_GROUP`).
pub dsm_kafka_group: Option<String>,
}

impl Default for LambdaConfig {
Expand All @@ -106,6 +123,9 @@ impl Default for LambdaConfig {
api_security_sample_delay: Duration::from_secs(30),
custom_metrics_exclude_tags: Vec::new(),
lambda_durable_function_log_buffer_size: 0,
dsm_consume_enabled: false,
dsm_exchange_name: None,
dsm_kafka_group: None,
}
}
}
Expand Down Expand Up @@ -182,6 +202,18 @@ pub struct LambdaConfigSource {
/// 0 (hold mechanism disabled).
#[serde(deserialize_with = "deser_opt_lossless")]
pub lambda_durable_function_log_buffer_size: Option<usize>,

/// `DD_DATA_STREAMS_ENABLED` — enable extension-side DSM consume
/// checkpoints. Shared with the tracer libraries; merges into the
/// `dsm_consume_enabled` config field.
#[serde(deserialize_with = "deser_opt_bool")]
pub data_streams_enabled: Option<bool>,
/// `DD_DSM_EXCHANGE_NAME` — fallback exchange name for `EventBridge` DSM checkpoints.
#[serde(deserialize_with = "deser_opt_str")]
pub dsm_exchange_name: Option<String>,
/// `DD_DSM_KAFKA_GROUP` — consumer group for MSK/Kafka DSM consume checkpoints.
#[serde(deserialize_with = "deser_opt_str")]
pub dsm_kafka_group: Option<String>,
}

impl DatadogConfigExtension for LambdaConfig {
Expand All @@ -207,7 +239,16 @@ impl DatadogConfigExtension for LambdaConfig {
api_security_sample_delay,
lambda_durable_function_log_buffer_size,
],
option: [span_dedup_timeout, api_key_secret_reload_interval, appsec_rules],
option: [span_dedup_timeout, api_key_secret_reload_interval, appsec_rules, dsm_exchange_name, dsm_kafka_group],
);

// data_streams_enabled (source / DD_DATA_STREAMS_ENABLED) →
// dsm_consume_enabled (config)
datadog_agent_config::merge_option_to_value!(
self,
dsm_consume_enabled,
source,
data_streams_enabled
);

// OR-merge serverless_logs_enabled with the logs_enabled alias. Either
Expand Down Expand Up @@ -503,6 +544,21 @@ mod lambda_config_tests {
assert!(!config.ext.lambda_extension_compute_stats);
}

#[test]
fn dsm_consume_enabled_from_data_streams_env() {
let config = load(|jail| {
jail.set_env("DD_DATA_STREAMS_ENABLED", "true");
Ok(())
});
assert!(config.ext.dsm_consume_enabled);
}

#[test]
fn dsm_consume_enabled_defaults_false() {
let config = load(|_| Ok(()));
assert!(!config.ext.dsm_consume_enabled);
}

// ---- Duration fields ----

#[test]
Expand Down
17 changes: 17 additions & 0 deletions bottlecap/src/flushing/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub struct FlushingService {
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<Vec<MetricsFlusher>>,

/// Optional extension-side DSM processor. When present, its aggregated
/// pipeline-stats payload is drained into the proxy aggregator immediately
/// before each proxy flush. `None` unless `DD_DATA_STREAMS_ENABLED` is set.
dsm_processor: Option<Arc<crate::traces::data_streams::DsmProcessor>>,

// Metrics aggregator handle for getting data to flush
metrics_aggr_handle: MetricsAggregatorHandle,

Expand All @@ -46,13 +51,15 @@ impl FlushingService {
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<Vec<MetricsFlusher>>,
metrics_aggr_handle: MetricsAggregatorHandle,
dsm_processor: Option<Arc<crate::traces::data_streams::DsmProcessor>>,
) -> Self {
Self {
logs_flusher,
trace_flusher,
stats_flusher,
proxy_flusher,
metrics_flushers,
dsm_processor,
metrics_aggr_handle,
handles: FlushHandles::new(),
}
Expand Down Expand Up @@ -123,6 +130,11 @@ impl FlushingService {
sf.flush(false, None).await.unwrap_or_default()
}));

// Drain DSM pipeline stats into the proxy aggregator before flushing.
if let Some(dsm) = &self.dsm_processor {
dsm.drain_into_proxy().await;
}

// Spawn proxy flush
let pf = self.proxy_flusher.clone();
self.handles
Expand Down Expand Up @@ -324,6 +336,11 @@ impl FlushingService {
})
.collect();

// Drain DSM pipeline stats into the proxy aggregator before flushing.
if let Some(dsm) = &self.dsm_processor {
dsm.drain_into_proxy().await;
}

tokio::join!(
self.logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
Expand Down
Loading
Loading