Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
72288a1
WIP: trace filters
Eldolfin May 13, 2026
bd2defb
feat: ignore_resources in trace filters
Eldolfin May 13, 2026
c0724c6
fix: expose ignore_resouces in builder
Eldolfin May 13, 2026
29eb4f0
fix: add missing license header
Eldolfin May 15, 2026
58b5939
feat: add snapshot test for trace filters
Eldolfin May 15, 2026
859f929
fix: update LICENSE-3rdparty.csv and fix snapshot ordering
Eldolfin May 15, 2026
38a03ca
fix: allow old PascalCase fields in agent redis/memcached obfuscation…
Eldolfin May 18, 2026
9bab9f1
wip normalization
Eldolfin May 21, 2026
5ed2a51
Merge remote-tracking branch 'origin/main' into oscarld/APMSP-2763-cs…
Eldolfin May 21, 2026
3056bd2
WIP
Eldolfin May 22, 2026
e75b9bb
feat: finish normalization on trace filters
Eldolfin May 26, 2026
3abd80a
Merge remote-tracking branch 'origin/main' into oscarld/APMSP-2763-cs…
Eldolfin May 26, 2026
9ffd1c8
fix: remove bad change to agent_info's schema
Eldolfin May 26, 2026
c2596f1
feat: regex key filters
Eldolfin May 26, 2026
b200c2f
fix: fmt
Eldolfin May 26, 2026
4e0ff40
add tests
Eldolfin May 27, 2026
d22bab9
revert regex key filters
Eldolfin Jun 1, 2026
f8c4e39
fix: check ignore_resources before the others
Eldolfin Jun 1, 2026
e235455
fix: rename get_root_span_index
Eldolfin Jun 1, 2026
e7a8695
fix: remove ability to configure trace filters from the trace exporte…
Eldolfin Jun 1, 2026
4b26fba
fix: fmt
Eldolfin Jun 1, 2026
580ca86
fix: filter traces on the client only when css is enabled
Eldolfin Jun 2, 2026
5afa42a
Merge remote-tracking branch 'origin/main' into oscarld/APMSP-2763-cs…
Eldolfin Jun 2, 2026
db48e84
fix: bad merge
Eldolfin Jun 2, 2026
c88ad06
feat: trim tag filters
Eldolfin Jun 3, 2026
7265368
feat: count trace filter dropped traces in telemetry
Eldolfin Jun 3, 2026
c0cc28f
fix: clippy lint
Eldolfin Jun 4, 2026
e197714
fix: ChunksDroppedByTraceFilter reason comment typo
Eldolfin Jun 4, 2026
977d4a4
fix: pass dropped chunks by trace filters all the way
Eldolfin Jun 4, 2026
36c65f4
remove unimportant fixme
Eldolfin Jun 5, 2026
c0cce0d
test: apply suggestion of not waiting for nothing
Eldolfin Jun 5, 2026
4ea70a8
Revert "test: apply suggestion of not waiting for nothing"
Eldolfin Jun 5, 2026
32f1085
feat: add snapshot test using test agent
Eldolfin Jun 9, 2026
2c0ea25
remove old no test agent snapshot test
Eldolfin Jun 9, 2026
d50be09
feat: move tracefilter's conf arcswap to the trace exporter
Eldolfin Jun 9, 2026
128d7b2
feat: make implementation generic using a minimal span trait
Eldolfin Jun 10, 2026
dde028f
chore: bump test agent version v1.56.0 -> v1.16.1
Eldolfin Jun 10, 2026
4633ef8
fix: clippy
Eldolfin Jun 10, 2026
daae8db
feat: move trace_filter component to trace-utils
Eldolfin Jun 10, 2026
ab63da6
fix: apply suggestions
Eldolfin Jun 11, 2026
2fd1d04
feat: add documentation to public items
Eldolfin Jun 11, 2026
71705f3
fix: remove unrelated changes
Eldolfin Jun 11, 2026
00e7913
fix: put new test in integration test group
Eldolfin Jun 11, 2026
dc57d61
Merge remote-tracking branch 'origin/main' into oscarld/APMSP-2763-cs…
Eldolfin Jun 11, 2026
5a05414
fix: test span meta HashMap -> VecMap
Eldolfin Jun 11, 2026
6e5f6bf
fix: licensecheck ?
Eldolfin Jun 11, 2026
676e0b9
Revert "fix: licensecheck ?"
Eldolfin Jun 11, 2026
428a819
fix: remove redundant span dedup call that broke V1 snapshot test and…
Eldolfin Jun 11, 2026
1dec1b0
fix: new copyright dates
Eldolfin Jun 11, 2026
8996a2a
re add span deduping
Eldolfin Jun 11, 2026
b74e92e
fix(tests): update V1 snapshot to match test agent v1.61.1 behavior
Eldolfin Jun 11, 2026
c26d7ed
fix: make span trait minimal again and filter_trace only works on v04…
Eldolfin Jun 15, 2026
4efe15f
feat: make Span trait even more minimal by moving normalization to th…
Eldolfin Jun 15, 2026
cc7156b
Merge remote-tracking branch 'origin/main' into oscarld/APMSP-2763-cs…
Eldolfin Jun 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions libdd-common/src/regex_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! regexes requiring Unicode character class support.

#[cfg(all(feature = "regex-lite", not(feature = "require-regex-full")))]
pub use regex_lite::{escape, Captures, Regex, RegexBuilder, Replacer};
pub use regex_lite::{escape, Captures, Error, Regex, RegexBuilder, Replacer};

#[cfg(not(all(feature = "regex-lite", not(feature = "require-regex-full"))))]
pub use regex::{escape, Captures, Regex, RegexBuilder, Replacer};
pub use regex::{escape, Captures, Error, Regex, RegexBuilder, Replacer};
1 change: 1 addition & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ libdd-common = { version = "4.2.0", path = "../libdd-common", default-features =
libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false }
libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true}
libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" }
libdd-trace-normalization = { version = "2.0.0", path = "../libdd-trace-normalization" }
libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false }
libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false }
libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false, optional = true }
Expand Down
15 changes: 10 additions & 5 deletions libdd-data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,25 @@ pub struct AgentInfoStruct {
/// Container tags hash from HTTP response header
pub container_tags_hash: Option<String>,
/// Exact-match tag filters applied before stats computation (root span only).
pub filter_tags: Option<FilterTagsConfig>,
#[serde(default)]
pub filter_tags: FilterTagsConfig,
/// Regex-match tag filters applied before stats computation (root span only).
pub filter_tags_regex: Option<FilterTagsConfig>,
#[serde(default)]
pub filter_tags_regex: FilterTagsConfig,
/// Regex patterns for root-span resource names; matching traces are excluded from stats.
pub ignore_resources: Option<Vec<String>>,
#[serde(default)]
pub ignore_resources: Vec<String>,
}

/// Require/reject lists for tag-based trace filters exposed by the agent /info endpoint.
#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)]
pub struct FilterTagsConfig {
/// All listed filters must match at least one root-span tag for the trace to be accepted.
pub require: Option<Vec<String>>,
#[serde(default)]
pub require: Vec<String>,
/// If any listed filter matches a root-span tag the trace is rejected.
pub reject: Option<Vec<String>>,
#[serde(default)]
pub reject: Vec<String>,
}

#[allow(missing_docs)]
Expand Down
11 changes: 11 additions & 0 deletions libdd-data-pipeline/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum MetricKind {
ChunksSent,
/// trace_chunks_dropped metric (reason: p0_drop)
ChunksDroppedP0,
/// trace_chunks_dropped metric (reason: trace_filters)
ChunksDroppedByTraceFilter,
/// trace_chunks_dropped metric (reason: serialization_error)
ChunksDroppedSerializationError,
/// trace_chunks_dropped metric (reason: send_failure)
Expand Down Expand Up @@ -102,6 +104,15 @@ const METRICS: &[Metric] = &[
namespace: MetricNamespace::Tracers,
tags: &[tag!["src_library", "libdatadog"], tag!["reason", "p0_drop"]],
},
Metric {
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
tags: &[
tag!["src_library", "libdatadog"],
tag!["reason", "trace_filters"],
],
},
Metric {
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
Expand Down
82 changes: 73 additions & 9 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use libdd_telemetry::worker::{
};
use libdd_trace_utils::{
send_with_retry::{SendWithRetryError, SendWithRetryResult},
span::trace_utils::DroppedStats,
trace_utils::SendDataResult,
};
use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -165,6 +166,7 @@ pub struct SendPayloadTelemetry {
bytes_sent: u64,
chunks_sent: u64,
chunks_dropped_p0: u64,
chunks_dropped_by_trace_filter: u64,
chunks_dropped_serialization_error: u64,
chunks_dropped_send_failure: u64,
responses_count_per_code: HashMap<u16, u64>,
Expand Down Expand Up @@ -193,15 +195,16 @@ impl SendPayloadTelemetry {
/// * `value` - The result of sending traces with retry
/// * `bytes_sent` - The number of bytes in the payload
/// * `chunks` - The number of trace chunks in the payload
/// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling
/// * `dropped_stats` - Trace dropped stats from `stats::process_traces_for_stats`
pub fn from_retry_result(
value: &SendWithRetryResult,
bytes_sent: u64,
chunks: u64,
chunks_dropped_p0: u64,
dropped_stats: DroppedStats,
) -> Self {
let mut telemetry = Self {
chunks_dropped_p0,
chunks_dropped_p0: dropped_stats.dropped_p0_traces as u64,
chunks_dropped_by_trace_filter: dropped_stats.dropped_by_trace_filter as u64,
..Default::default()
};
match value {
Expand Down Expand Up @@ -288,6 +291,13 @@ impl TelemetryClient {
self.worker
.add_point(data.chunks_dropped_p0 as f64, key, vec![])?;
}
if data.chunks_dropped_by_trace_filter > 0 {
Comment thread
Eldolfin marked this conversation as resolved.
let key = self
.metrics
.get(metrics::MetricKind::ChunksDroppedByTraceFilter);
self.worker
.add_point(data.chunks_dropped_by_trace_filter as f64, key, vec![])?;
}
if data.chunks_dropped_serialization_error > 0 {
let key = self
.metrics
Expand Down Expand Up @@ -704,7 +714,16 @@ mod tests {
.unwrap(),
3,
));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 0);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
4,
5,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -726,7 +745,16 @@ mod tests {
.unwrap(),
3,
));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 10);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
4,
5,
DroppedStats {
dropped_p0_traces: 10,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -747,7 +775,16 @@ mod tests {
.body(Bytes::new())
.unwrap();
let result = Err(SendWithRetryError::Http(error_response, 5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -766,7 +803,16 @@ mod tests {
HttpError::Network(anyhow::anyhow!("connection refused")),
5,
));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -781,7 +827,16 @@ mod tests {
#[test]
fn telemetry_from_timeout_error_test() {
let result = Err(SendWithRetryError::Timeout(5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand All @@ -797,7 +852,16 @@ mod tests {
#[test]
fn telemetry_from_build_error_test() {
let result = Err(SendWithRetryError::Build(5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
let telemetry = SendPayloadTelemetry::from_retry_result(
&result,
1,
2,
DroppedStats {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
dropped_by_trace_filter: 0,
},
);
assert_eq!(
telemetry,
SendPayloadTelemetry {
Expand Down
2 changes: 2 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
use libdd_common::{parse_uri, tag, Endpoint};
use libdd_dogstatsd_client::new;
use libdd_shared_runtime::SharedRuntime;
use libdd_trace_utils::trace_filter::TraceFilterer;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -514,6 +515,7 @@ impl TraceExporterBuilder {
.agent_rates_payload_version_enabled
.then(AgentResponsePayloadVersion::new),
otlp_config,
trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()),
})
}

Expand Down
24 changes: 18 additions & 6 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod trace_serializer;

// Re-export the builder
pub use builder::TraceExporterBuilder;
use libdd_trace_utils::{span::trace_utils::DroppedStats, trace_filter::TraceFilterer};

use self::agent_response::AgentResponse;
use self::metrics::MetricsEmitter;
Expand All @@ -30,7 +31,7 @@ use crate::{
health_metrics,
health_metrics::{HealthMetric, SendResult, TransportErrorType},
};
use arc_swap::ArcSwapOption;
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use http::header::HeaderMap;
use http::uri::PathAndQuery;
Expand Down Expand Up @@ -213,6 +214,7 @@ pub struct TraceExporter<C: HttpClientCapability + SleepCapability + MaybeSend +
agent_payload_response_version: Option<AgentResponsePayloadVersion>,
/// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent.
otlp_config: Option<OtlpTraceConfig>,
trace_filterer: ArcSwap<TraceFilterer>,
}

impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> TraceExporter<C> {
Expand Down Expand Up @@ -348,9 +350,9 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
.map(|s| s.as_str())
}

#[cfg(not(target_arch = "wasm32"))]
/// Reconcile in-process stats state with the latest agent info.
/// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`.
#[cfg(not(target_arch = "wasm32"))]
async fn check_agent_info(&self) {
let Some(agent_info) = agent_info::get_agent_info() else {
return;
Expand All @@ -363,6 +365,14 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
self.refresh_v1_active(&agent_info);
}

self.trace_filterer.store(Arc::new(TraceFilterer::new(
&agent_info.info.filter_tags.require,
&agent_info.info.filter_tags.reject,
&agent_info.info.filter_tags_regex.require,
&agent_info.info.filter_tags_regex.reject,
&agent_info.info.ignore_resources,
)));

// load_full() avoids holding an ArcSwap Guard (!Send) across .await.
let status = self.client_side_stats.status.load_full();
match &*status {
Expand Down Expand Up @@ -567,7 +577,8 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
mp_payload: Vec<u8>,
headers: HeaderMap,
chunks: usize,
#[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] chunks_dropped_p0: usize,
#[cfg_attr(not(feature = "telemetry"), allow(unused_variables))]
dropped_stats: DroppedStats,
) -> Result<AgentResponse, TraceExporterError> {
let strategy = RetryStrategy::default();
let payload_len = mp_payload.len();
Expand All @@ -588,7 +599,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
&result,
payload_len as u64,
chunks as u64,
chunks_dropped_p0 as u64,
dropped_stats,
)) {
error!(?e, "Error sending telemetry");
}
Expand All @@ -605,11 +616,12 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra

// Process stats computation and drop non-sampled (p0) chunks.
// This must run before the OTLP path so that unsampled spans are not exported.
let dropped_p0_stats = stats::process_traces_for_stats(
let dropped_stats = stats::process_traces_for_stats(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a comment for this PR just a remark for future improvement. We should be able to submit telemetry from within the stats module rather than having to pass it to the send_traces_with_telemetry

&mut traces,
&mut header_tags,
&self.client_side_stats.status,
self.client_computed_top_level,
&self.trace_filterer.load(),
);

for chunk in &mut traces {
Expand Down Expand Up @@ -662,7 +674,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
prepared.data,
prepared.headers,
prepared.chunk_count,
dropped_p0_stats.dropped_p0_traces,
dropped_stats,
)
.await;

Expand Down
Loading
Loading