Skip to content

feat(dsm): Add support for automatic DSM context extraction inside the extension#1265

Open
jeastham1993 wants to merge 26 commits into
mainfrom
feat/extension-dsm-support
Open

feat(dsm): Add support for automatic DSM context extraction inside the extension#1265
jeastham1993 wants to merge 26 commits into
mainfrom
feat/extension-dsm-support

Conversation

@jeastham1993

@jeastham1993 jeastham1993 commented Jun 18, 2026

Copy link
Copy Markdown

Please include Jira ticket in title.

Overview

Update the trace context propagation support in universal instrumentation to include the automatic extraction of Data Streams Monitoring context for SQS, SNS, Kinesis and EventBridge.

Testing

Added unit tests. Manually tested the functionality for Java, Go and .NET. Java and Go work correctly. The .NET tracer needs updating to support the 2.x version of Amazon.Lambda.RuntimeSupport so can't test that manually yet.

Copilot AI review requested due to automatic review settings June 18, 2026 07:32
@jeastham1993 jeastham1993 requested review from a team as code owners June 18, 2026 07:32
@jeastham1993 jeastham1993 requested a review from lym953 June 18, 2026 07:32
@datadog-official

datadog-official Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 5 Pipeline jobs failed

DataDog/datadog-lambda-extension | integration-suite: [on-demand]   View in Datadog   GitLab

DataDog/datadog-lambda-extension | e2e-test-status (amd64, fips)   View in Datadog   GitLab

DataDog/datadog-lambda-extension | integration-suite: [lmi]   View in Datadog   GitLab

View all 5 failed jobs.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 9112d74 | Docs | Datadog PR Page | Give us feedback!

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Bottlecap’s universal instrumentation trace context propagation to automatically extract and emit Data Streams Monitoring (DSM) consume-side checkpoints within the extension for SQS, SNS, Kinesis, and EventBridge. It adds a DSM aggregation/serialization pipeline that drains into the existing proxy flush path and introduces configuration knobs to enable DSM consume extraction and provide an EventBridge exchange fallback.

Changes:

  • Add a new traces::data_streams module implementing DSM context decode, pathway hashing, checkpoint computation, aggregation, and msgpack+gzip payload generation.
  • Wire an optional DsmProcessor into invocation start processing (universal instrumentation) and into the flushing pipeline so pipeline-stats are shipped via the proxy flusher.
  • Add trigger-specific DSM edge tags for SQS/SNS/Kinesis/EventBridge and introduce DD_DSM_CONSUME_ENABLED / DD_DSM_EXCHANGE_NAME config support.

Reviewed changes

Copilot reviewed 25 out of 26 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
bottlecap/src/traces/mod.rs Exposes the new data_streams module from traces.
bottlecap/src/traces/data_streams/mod.rs DSM module entrypoint; re-exports key DSM types/functions.
bottlecap/src/traces/data_streams/context.rs Implements inbound DSM pathway context decoding (base64 + zigzag varints).
bottlecap/src/traces/data_streams/pathway.rs Implements dd-trace-js-compatible pathway hash computation.
bottlecap/src/traces/data_streams/propagation_hash.rs Implements optional FNV-1 propagation hash.
bottlecap/src/traces/data_streams/checkpoint.rs Computes consume-side checkpoints from extracted context + tags.
bottlecap/src/traces/data_streams/sketch.rs Implements tracer-compatible DDSketch + protobuf bytes serialization.
bottlecap/src/traces/data_streams/aggregator.rs Aggregates checkpoints into 10s buckets and serializes msgpack payloads.
bottlecap/src/traces/data_streams/processor.rs Bridges checkpoint aggregation to proxy flush by gzipping/enqueueing proxy requests.
bottlecap/src/traces/data_streams/fixtures/sketch_golden.json Golden vectors to validate DDSketch compatibility.
bottlecap/src/tags/lambda/tags.rs Updates hardcoded extension version string used in tags/logging.
bottlecap/src/proxy/interceptor.rs Enables universal instrumentation processing when experimental proxy env var is set.
bottlecap/src/lifecycle/invocation/triggers/mod.rs Adds Trigger::get_dsm_edge_tags() default hook.
bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs Adds SQS DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/sns_event.rs Adds SNS DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs Adds Kinesis DSM consume edge tag derivation.
bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs Adds EventBridge DSM consume tags + best-effort bus name extraction + tests.
bottlecap/src/lifecycle/invocation/processor.rs Hooks DSM consume recording into universal instrumentation start; adds EventBridge exchange fallback.
bottlecap/src/lifecycle/invocation/processor_service.rs Threads optional DsmProcessor into Processor initialization.
bottlecap/src/flushing/service.rs Drains DSM pipeline-stats into proxy aggregator immediately before proxy flush.
bottlecap/src/config/yaml.rs Adds YAML config fields for DSM consume enabling and exchange fallback.
bottlecap/src/config/mod.rs Adds Config fields: dsm_consume_enabled, dsm_exchange_name.
bottlecap/src/config/env.rs Adds env config parsing/merging for DD_DSM_CONSUME_ENABLED and DD_DSM_EXCHANGE_NAME.
bottlecap/src/bin/bottlecap/main.rs Constructs shared proxy aggregator, conditionally instantiates DSM processor, wires it into services.
bottlecap/Cargo.toml Promotes msgpack/gzip dependencies to main deps and adds serde_bytes.
bottlecap/Cargo.lock Adds serde_bytes to resolved dependency set.

Comment thread bottlecap/src/tags/lambda/tags.rs Outdated
Comment thread bottlecap/src/traces/data_streams/aggregator.rs Outdated
Comment thread bottlecap/src/traces/data_streams/processor.rs Outdated
Comment thread bottlecap/src/traces/data_streams/processor.rs
Comment thread bottlecap/src/traces/data_streams/context.rs
Comment thread bottlecap/src/lifecycle/invocation/processor.rs Outdated
Comment thread bottlecap/src/traces/mod.rs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 579f6e6d9b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/tags/lambda/tags.rs Outdated
Comment thread bottlecap/src/traces/data_streams/processor.rs
Comment thread bottlecap/src/lifecycle/invocation/processor.rs Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 947380af7b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ca5b7da286

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/bin/bottlecap/main.rs Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9b923a55cf

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// functions whose tracer does not drive the invocation lifecycle.
let experimental_proxy_enabled = std::env::var("DD_EXPERIMENTAL_ENABLE_PROXY")
.is_ok_and(|v| v.eq_ignore_ascii_case("true"));
if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() || experimental_proxy_enabled {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drive DSM extraction when the proxy already has the payload

When the runtime API proxy is running for the Datadog wrapper because AppSec is enabled, and DD_DSM_CONSUME_ENABLED=true but no tracer calls /lambda/start-invocation, this condition stays false unless LWA or DD_EXPERIMENTAL_ENABLE_PROXY is also set. In that no-tracer/AppSec setup the event body is available here but lwa::process_invocation_next is skipped, so process_on_universal_instrumentation_start never records any DSM consume checkpoints; include the DSM flag, or all proxy-active wrapper cases, in this gate.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c5dc4159e2

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/lifecycle/invocation/processor.rs Outdated
…pport

# Conflicts:
#	bottlecap/src/config/env.rs
#	bottlecap/src/config/mod.rs
#	bottlecap/src/config/yaml.rs

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7d710d7418

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/bin/bottlecap/main.rs Outdated
bottlecap::traces::data_streams::DsmProcessor::new(
service,
env,
env!("CARGO_PKG_VERSION").to_string(),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use the extension version for DSM payloads

When DD_DSM_CONSUME_ENABLED is set, every extension-generated pipeline_stats payload reports TracerVersion from Cargo's package version, which is currently the crate's fixed 0.1.0 rather than the released Lambda extension version exposed via EXTENSION_VERSION. That makes DSM intake data from different extension releases indistinguishable for rollout/debugging of this new feature; pass the extension version constant instead of CARGO_PKG_VERSION.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c27c642406

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

}

#[derive(Serialize)]
struct StatsPayloadSer {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include version and global tags in DSM payloads

When DD_VERSION or DD_TAGS are configured, extension-produced DSM points will not carry those unified service/custom tags because this payload shape has no Version or Tags fields, while the proxy flusher only adds _dd.origin and functionname headers. As a result data_streams.latency emitted by DD_DSM_CONSUME_ENABLED cannot be filtered or grouped by deployment version, team, or other global tags even though the rest of the extension telemetry can; pass config.version and config.tags into the DSM aggregator and serialize them with the payload.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f99e4b3c27

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/lifecycle/invocation/processor.rs

@duncanista duncanista left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few things worth digging into before merge, mostly architecture and correctness plus one cold-start nit. nothing blocking that can't be sorted in-thread, inline below.

Comment thread bottlecap/src/bin/bottlecap/main.rs Outdated
Comment thread bottlecap/src/config/mod.rs
Comment thread bottlecap/src/lifecycle/invocation/processor.rs
Comment thread bottlecap/src/traces/data_streams/processor.rs
Comment thread bottlecap/src/traces/data_streams/sketch.rs
@jeastham1993

Copy link
Copy Markdown
Author

@duncanista think that's everything addressed 👍

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f7f80283aa

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +211 to +212
let dsm_consume_enabled = std::env::var("DD_DATA_STREAMS_ENABLED")
.is_ok_and(|v| v.eq_ignore_ascii_case("true"));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor configured DSM flag in proxy gate

When Data Streams is enabled through the normal config path, e.g. datadog.yaml with data_streams_enabled: true, main creates the DSM processor from config.ext.dsm_consume_enabled, but this proxy gate re-reads only the DD_DATA_STREAMS_ENABLED environment variable. In an AppSec/datadog-wrapper setup with no tracer /lambda/start-invocation, the intercepted payload is available but this condition remains false, so the DSM hook never records consume checkpoints despite the feature being enabled; pass the resolved config flag into the interceptor instead of checking the environment here.

Useful? React with 👍 / 👎.

Comment on lines +290 to +292
let carrier = record
.get("headers")
.map_or_else(HashMap::new, headers_to_string_map);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve raw Kafka pathway context bytes

For MSK/Kafka records that still carry the legacy dd-pathway-ctx header, the Lambda event exposes the header value as bytes, and those bytes are intentionally not UTF-8-safe because the first 8 bytes are an arbitrary pathway hash. This added DSM path converts the record headers through headers_to_string_map, which only keeps values that pass String::from_utf8, so legacy raw pathway contexts are dropped before extract_pathway_context can decode them with from_bytes, breaking continuity for those Kafka consumers; keep raw bytes for dd-pathway-ctx or encode that header losslessly.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 745c6f5274

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread bottlecap/src/proxy/interceptor.rs Outdated
.is_ok_and(|v| v.eq_ignore_ascii_case("true"));
if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some()
|| experimental_proxy_enabled
|| dsm_consume_enabled

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid LWA reparenting for DSM-only proxy starts

When the Runtime API proxy is active for AppSec and DD_DATA_STREAMS_ENABLED=true in a runtime that still calls /lambda/start-invocation (Java/Go/.NET), this new branch runs lwa::process_invocation_next, which unconditionally queues reparenting with a freshly generated invocation span id, while the later tracer start replaces the invocation span id from x-datadog-span-id. The trace flusher then reparents child spans to the synthetic id that is never emitted, so traces with inbound context become orphaned; drive only the DSM extraction here or avoid adding LWA reparenting when a tracer start is expected.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants