diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..148b202d9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3002,6 +3002,7 @@ dependencies = [ "libdd-shared-runtime", "libdd-telemetry", "libdd-tinybytes", + "libdd-trace-normalization", "libdd-trace-obfuscation", "libdd-trace-protobuf", "libdd-trace-stats", diff --git a/libdd-common/src/regex_engine.rs b/libdd-common/src/regex_engine.rs index f3674f6e12..c5fb7d7973 100644 --- a/libdd-common/src/regex_engine.rs +++ b/libdd-common/src/regex_engine.rs @@ -13,7 +13,7 @@ //! regexes requiring Unicode character class support. #[cfg(all(feature = "regex-lite", not(feature = "require-regex-full")))] -pub use regex_lite::{escape, Captures, Regex, RegexBuilder, Replacer}; +pub use regex_lite::{escape, Captures, Error, Regex, RegexBuilder, Replacer}; #[cfg(not(all(feature = "regex-lite", not(feature = "require-regex-full"))))] -pub use regex::{escape, Captures, Regex, RegexBuilder, Replacer}; +pub use regex::{escape, Captures, Error, Regex, RegexBuilder, Replacer}; diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index bb93a10a59..00578c4eeb 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -36,6 +36,7 @@ libdd-common = { version = "4.2.0", path = "../libdd-common", default-features = libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime", default-features = false } libdd-telemetry = { version = "5.0.1", path = "../libdd-telemetry", default-features = false, optional = true} libdd-trace-protobuf = { version = "3.0.2", path = "../libdd-trace-protobuf" } +libdd-trace-normalization = { version = "2.0.0", path = "../libdd-trace-normalization" } libdd-trace-stats = { version = "5.0.0", path = "../libdd-trace-stats", default-features = false } libdd-trace-utils = { version = "8.0.0", path = "../libdd-trace-utils", default-features = false } libdd-trace-obfuscation = { version = "4.0.0", path = "../libdd-trace-obfuscation", default-features = false, optional = true } diff --git a/libdd-data-pipeline/src/agent_info/schema.rs b/libdd-data-pipeline/src/agent_info/schema.rs index 7b7bfc4e3a..0b70b77164 100644 --- a/libdd-data-pipeline/src/agent_info/schema.rs +++ b/libdd-data-pipeline/src/agent_info/schema.rs @@ -40,20 +40,25 @@ pub struct AgentInfoStruct { /// Container tags hash from HTTP response header pub container_tags_hash: Option, /// Exact-match tag filters applied before stats computation (root span only). - pub filter_tags: Option, + #[serde(default)] + pub filter_tags: FilterTagsConfig, /// Regex-match tag filters applied before stats computation (root span only). - pub filter_tags_regex: Option, + #[serde(default)] + pub filter_tags_regex: FilterTagsConfig, /// Regex patterns for root-span resource names; matching traces are excluded from stats. - pub ignore_resources: Option>, + #[serde(default)] + pub ignore_resources: Vec, } /// Require/reject lists for tag-based trace filters exposed by the agent /info endpoint. #[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct FilterTagsConfig { /// All listed filters must match at least one root-span tag for the trace to be accepted. - pub require: Option>, + #[serde(default)] + pub require: Vec, /// If any listed filter matches a root-span tag the trace is rejected. - pub reject: Option>, + #[serde(default)] + pub reject: Vec, } #[allow(missing_docs)] diff --git a/libdd-data-pipeline/src/telemetry/metrics.rs b/libdd-data-pipeline/src/telemetry/metrics.rs index feeb74e492..f610c7e619 100644 --- a/libdd-data-pipeline/src/telemetry/metrics.rs +++ b/libdd-data-pipeline/src/telemetry/metrics.rs @@ -27,6 +27,8 @@ pub enum MetricKind { ChunksSent, /// trace_chunks_dropped metric (reason: p0_drop) ChunksDroppedP0, + /// trace_chunks_dropped metric (reason: trace_filters) + ChunksDroppedByTraceFilter, /// trace_chunks_dropped metric (reason: serialization_error) ChunksDroppedSerializationError, /// trace_chunks_dropped metric (reason: send_failure) @@ -102,6 +104,15 @@ const METRICS: &[Metric] = &[ namespace: MetricNamespace::Tracers, tags: &[tag!["src_library", "libdatadog"], tag!["reason", "p0_drop"]], }, + Metric { + name: CHUNKS_DROPPED_STR, + metric_type: MetricType::Count, + namespace: MetricNamespace::Tracers, + tags: &[ + tag!["src_library", "libdatadog"], + tag!["reason", "trace_filters"], + ], + }, Metric { name: CHUNKS_DROPPED_STR, metric_type: MetricType::Count, diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 9bf4177b5a..cd196e3556 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -13,6 +13,7 @@ use libdd_telemetry::worker::{ }; use libdd_trace_utils::{ send_with_retry::{SendWithRetryError, SendWithRetryResult}, + span::trace_utils::DroppedStats, trace_utils::SendDataResult, }; use std::{collections::HashMap, time::Duration}; @@ -165,6 +166,7 @@ pub struct SendPayloadTelemetry { bytes_sent: u64, chunks_sent: u64, chunks_dropped_p0: u64, + chunks_dropped_by_trace_filter: u64, chunks_dropped_serialization_error: u64, chunks_dropped_send_failure: u64, responses_count_per_code: HashMap, @@ -193,15 +195,16 @@ impl SendPayloadTelemetry { /// * `value` - The result of sending traces with retry /// * `bytes_sent` - The number of bytes in the payload /// * `chunks` - The number of trace chunks in the payload - /// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling + /// * `dropped_stats` - Trace dropped stats from `stats::process_traces_for_stats` pub fn from_retry_result( value: &SendWithRetryResult, bytes_sent: u64, chunks: u64, - chunks_dropped_p0: u64, + dropped_stats: DroppedStats, ) -> Self { let mut telemetry = Self { - chunks_dropped_p0, + chunks_dropped_p0: dropped_stats.dropped_p0_traces as u64, + chunks_dropped_by_trace_filter: dropped_stats.dropped_by_trace_filter as u64, ..Default::default() }; match value { @@ -288,6 +291,13 @@ impl TelemetryClient { self.worker .add_point(data.chunks_dropped_p0 as f64, key, vec![])?; } + if data.chunks_dropped_by_trace_filter > 0 { + let key = self + .metrics + .get(metrics::MetricKind::ChunksDroppedByTraceFilter); + self.worker + .add_point(data.chunks_dropped_by_trace_filter as f64, key, vec![])?; + } if data.chunks_dropped_serialization_error > 0 { let key = self .metrics @@ -704,7 +714,16 @@ mod tests { .unwrap(), 3, )); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 0); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 4, + 5, + DroppedStats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { @@ -726,7 +745,16 @@ mod tests { .unwrap(), 3, )); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 10); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 4, + 5, + DroppedStats { + dropped_p0_traces: 10, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { @@ -747,7 +775,16 @@ mod tests { .body(Bytes::new()) .unwrap(); let result = Err(SendWithRetryError::Http(error_response, 5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 1, + 2, + DroppedStats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { @@ -766,7 +803,16 @@ mod tests { HttpError::Network(anyhow::anyhow!("connection refused")), 5, )); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 1, + 2, + DroppedStats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { @@ -781,7 +827,16 @@ mod tests { #[test] fn telemetry_from_timeout_error_test() { let result = Err(SendWithRetryError::Timeout(5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 1, + 2, + DroppedStats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { @@ -797,7 +852,16 @@ mod tests { #[test] fn telemetry_from_build_error_test() { let result = Err(SendWithRetryError::Build(5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); + let telemetry = SendPayloadTelemetry::from_retry_result( + &result, + 1, + 2, + DroppedStats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + dropped_by_trace_filter: 0, + }, + ); assert_eq!( telemetry, SendPayloadTelemetry { diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..577280550a 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -22,6 +22,7 @@ use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::{parse_uri, tag, Endpoint}; use libdd_dogstatsd_client::new; use libdd_shared_runtime::SharedRuntime; +use libdd_trace_utils::trace_filter::TraceFilterer; use std::sync::Arc; use std::time::Duration; @@ -514,6 +515,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), otlp_config, + trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()), }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 070fc754e0..0486a32678 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -9,6 +9,7 @@ mod trace_serializer; // Re-export the builder pub use builder::TraceExporterBuilder; +use libdd_trace_utils::{span::trace_utils::DroppedStats, trace_filter::TraceFilterer}; use self::agent_response::AgentResponse; use self::metrics::MetricsEmitter; @@ -30,7 +31,7 @@ use crate::{ health_metrics, health_metrics::{HealthMetric, SendResult, TransportErrorType}, }; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use http::header::HeaderMap; use http::uri::PathAndQuery; @@ -213,6 +214,7 @@ pub struct TraceExporter, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. otlp_config: Option, + trace_filterer: ArcSwap, } impl TraceExporter { @@ -348,9 +350,9 @@ impl Tra .map(|s| s.as_str()) } - #[cfg(not(target_arch = "wasm32"))] /// Reconcile in-process stats state with the latest agent info. /// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`. + #[cfg(not(target_arch = "wasm32"))] async fn check_agent_info(&self) { let Some(agent_info) = agent_info::get_agent_info() else { return; @@ -363,6 +365,14 @@ impl Tra self.refresh_v1_active(&agent_info); } + self.trace_filterer.store(Arc::new(TraceFilterer::new( + &agent_info.info.filter_tags.require, + &agent_info.info.filter_tags.reject, + &agent_info.info.filter_tags_regex.require, + &agent_info.info.filter_tags_regex.reject, + &agent_info.info.ignore_resources, + ))); + // load_full() avoids holding an ArcSwap Guard (!Send) across .await. let status = self.client_side_stats.status.load_full(); match &*status { @@ -567,7 +577,8 @@ impl Tra mp_payload: Vec, headers: HeaderMap, chunks: usize, - #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] chunks_dropped_p0: usize, + #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] + dropped_stats: DroppedStats, ) -> Result { let strategy = RetryStrategy::default(); let payload_len = mp_payload.len(); @@ -588,7 +599,7 @@ impl Tra &result, payload_len as u64, chunks as u64, - chunks_dropped_p0 as u64, + dropped_stats, )) { error!(?e, "Error sending telemetry"); } @@ -605,11 +616,12 @@ impl Tra // Process stats computation and drop non-sampled (p0) chunks. // This must run before the OTLP path so that unsampled spans are not exported. - let dropped_p0_stats = stats::process_traces_for_stats( + let dropped_stats = stats::process_traces_for_stats( &mut traces, &mut header_tags, &self.client_side_stats.status, self.client_computed_top_level, + &self.trace_filterer.load(), ); for chunk in &mut traces { @@ -662,7 +674,7 @@ impl Tra prepared.data, prepared.headers, prepared.chunk_count, - dropped_p0_stats.dropped_p0_traces, + dropped_stats, ) .await; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index ee715796ba..93d668781b 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -7,6 +7,9 @@ //! including starting/stopping stats workers, managing the span concentrator, //! and processing traces for stats collection. +#[cfg(not(target_arch = "wasm32"))] +use super::add_path; +use super::TracerMetadata; #[cfg(not(target_arch = "wasm32"))] use crate::agent_info::schema::AgentInfo; use arc_swap::ArcSwap; @@ -22,15 +25,12 @@ use libdd_trace_stats::span_concentrator::{ }; #[cfg(not(target_arch = "wasm32"))] use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; +use libdd_trace_utils::trace_filter::TraceFilterer; use std::sync::{Arc, Mutex}; use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use tracing::{debug, error}; -#[cfg(not(target_arch = "wasm32"))] -use super::add_path; -use super::TracerMetadata; - #[cfg(not(target_arch = "wasm32"))] pub(crate) const DEFAULT_STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = ["client", "server", "producer", "consumer"]; @@ -305,12 +305,15 @@ pub(crate) fn process_traces_for_stats( header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, -) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { + trace_filterer: &TraceFilterer, +) -> libdd_trace_utils::span::trace_utils::DroppedStats { let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { stats_concentrator, .. } = &**status { + let dropped_by_trace_filter = trace_filterer.filter_traces(traces); + if !client_computed_top_level { for chunk in traces.iter_mut() { libdd_trace_utils::span::trace_utils::compute_top_level_span(chunk); @@ -319,20 +322,22 @@ pub(crate) fn process_traces_for_stats( add_spans_to_stats(stats_concentrator, traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); + let mut dropped_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); + dropped_stats.dropped_by_trace_filter = dropped_by_trace_filter; // Update the headers to indicate that stats have been computed and forward dropped // traces counts header_tags.client_computed_top_level = true; header_tags.client_computed_stats = true; - header_tags.dropped_p0_traces = dropped_p0_stats.dropped_p0_traces; - header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans; + header_tags.dropped_p0_traces = dropped_stats.dropped_p0_traces; + header_tags.dropped_p0_spans = dropped_stats.dropped_p0_spans; - dropped_p0_stats + dropped_stats } else { - libdd_trace_utils::span::trace_utils::DroppedP0Stats { + libdd_trace_utils::span::trace_utils::DroppedStats { dropped_p0_traces: 0, dropped_p0_spans: 0, + dropped_by_trace_filter: 0, } } } diff --git a/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json index 039057d503..1c7db3b29a 100644 --- a/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json +++ b/libdd-data-pipeline/tests/snapshots/compare_exporter_v04_to_v1_trace_snapshot_test.json @@ -11,7 +11,6 @@ "_dd.hostname": "my-host", "_dd.origin": "lambda", "_dd.p.dm": "-4", - "_dd.p.tid": "0x0", "component": "http", "env": "test-env", "runtime-id": "test-runtime-id-value", @@ -35,8 +34,6 @@ "parent_id": 1, "meta": { "_dd.origin": "lambda", - "_dd.p.dm": "-4", - "_dd.p.tid": "0x0", "env": "test-env", "runtime-id": "test-runtime-id-value", "service": "test-service", diff --git a/libdd-data-pipeline/tests/snapshots/trace_filters_snapshot_test.json b/libdd-data-pipeline/tests/snapshots/trace_filters_snapshot_test.json new file mode 100644 index 0000000000..97cc5c70e8 --- /dev/null +++ b/libdd-data-pipeline/tests/snapshots/trace_filters_snapshot_test.json @@ -0,0 +1,36 @@ +[ + [ + { + "service": "test-service", + "name": "passes_filters_first", + "resource": "test", + "trace_id": 1, + "span_id": 1, + "start": 0, + "duration": 5, + "meta": { + "my_require_tag": "true" + }, + "metrics": { + "_top_level": 1.0 + } + } + ], + [ + { + "service": "test-service", + "name": "passes_filters_last", + "resource": "test2", + "trace_id": 1, + "span_id": 1, + "start": 0, + "duration": 5, + "meta": { + "my_require_tag": "true" + }, + "metrics": { + "_top_level": 1.0 + } + } + ] +] diff --git a/libdd-data-pipeline/tests/test_trace_filters.rs b/libdd-data-pipeline/tests/test_trace_filters.rs new file mode 100644 index 0000000000..12c07e9be1 --- /dev/null +++ b/libdd-data-pipeline/tests/test_trace_filters.rs @@ -0,0 +1,167 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +use libdd_capabilities_impl::NativeCapabilities; +use libdd_data_pipeline::{ + agent_info, + trace_exporter::{TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat}, +}; +use libdd_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; +use rand::Rng; +use serde_json::json; + +mod tracing_integration_tests { + use super::*; + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn trace_filters_snapshot_test() { + const EXTRA_INFO: &str = r#"{ + "version":"1", + "filter_tags": {"reject": ["my_ignore_tag"], "require": ["my_require_tag:true"]}, + "filter_tags_regex": {"reject": ["my_regex_ignore_tag:.*true.*"]}, + "ignore_resources": [".*IGNORED.*"] + }"#; + let relative_snapshot_path = "libdd-data-pipeline/tests/snapshots/"; + let snapshot_name = "trace_filters_snapshot_test"; + let test_agent = DatadogTestAgent::new( + Some(relative_snapshot_path), + None, + &[("DD_AGENT_EXTRA_INFO", EXTRA_INFO)], + ) + .await; + let url = test_agent.get_base_uri().await; + test_agent.start_session(snapshot_name, None).await; + + let mut builder = TraceExporter::::builder(); + builder + .enable_stats(Duration::from_secs(10)) + .set_env("staging") + .set_language("nodejs") + .set_language_interpreter("v8") + .set_language_version("1.0") + .set_service("test") + .set_test_session_token(snapshot_name) + .set_tracer_version("1.0") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04) + .set_url(url.to_string().as_ref()); + + let trace_exporter = builder + .build_async::() + .await + .expect("Unable to build TraceExporter"); + let data = get_v04_trace_snapshot_test_payload(); + let timeout = Duration::from_secs(2); + let start = Instant::now(); + loop { + if std::time::Instant::now().duration_since(start) > timeout { + panic!("Timeout waiting for agent info to be ready"); + } + if agent_info::get_agent_info().is_some() { + break; + } + std::thread::sleep(Duration::from_millis(10)); + } + + let response = trace_exporter.send_async(data.as_ref()).await; + assert!(response.is_ok()); + + tokio::task::spawn_blocking(move || drop(trace_exporter)) + .await + .unwrap(); + + let received_traces = test_agent.get_sent_traces().await; + + println!( + "{}", + serde_json::to_string_pretty(&received_traces).unwrap() + ); + + test_agent.assert_snapshot(snapshot_name).await; + } +} + +fn get_v04_trace_snapshot_test_payload() -> Vec { + let traces = vec![ + trace_1_span( + "passes_filters_first", + "test", + &[("my_require_tag", "true")], + ), + // This one gets filtered out because it matches an ignore_resources pattern + trace_1_span( + "ignored_resource", + "test IGNORED resource test", + &[("my_require_tag", "true")], + ), + // This one gets filtered out because one of its tag matches a reject filter_tag + trace_1_span( + "reject_filter_tag", + "test ignored because of reject filter_tag", + &[("my_ignore_tag", ""), ("my_require_tag", "true")], + ), + // This one gets filtered out because one of its tag matches a reject + // regex_filter_tag + trace_1_span( + "reject_rejex_filter_tag", + "test ignored because of reject regex_filter_tag", + &[ + ("my_regex_ignore_tag", "something-true-something"), + ("my_require_tag", "true"), + ], + ), + // This one gets filtered out because it doesn't have my_require_tag:true + trace_1_span( + "missing_required_filter_tag", + "test ignored because missing a required filter_tag", + &[("a_useless_tag", "true")], + ), + // This one gets filtered out because it doesn't have my_require_tag:true + trace_1_span( + "missing_required_filter_tag_value", + "test ignored because wrong value on filter_tag", + &[("my_require_tag", "false")], + ), + trace_1_span( + "passes_filters_last", + "test2", + &[("my_require_tag", "true")], + ), + ]; + rmp_serde::to_vec_named(&traces).unwrap() +} + +pub fn trace_1_span(name: &str, resource: &str, meta: &[(&str, &str)]) -> Vec { + vec![span(name, resource, meta)] +} + +pub fn span(name: &str, resource: &str, meta: &[(&str, &str)]) -> serde_json::Value { + let trace_id: u32 = rand::thread_rng().gen(); + let span_id: u32 = rand::thread_rng().gen(); + let meta: HashMap<&str, &str> = HashMap::from_iter(meta.iter().copied()); + + json!( + { + "name": name, + "resource": resource, + "meta": meta, + "trace_id": trace_id, + "span_id": span_id, + "parent_id": 0, + "service": "test-service", + "start": 0, + "duration": 5, + "error": 0, + "metrics": {}, + "meta_struct": {}, + "span_links": [], + "span_events": [], + } + ) +} diff --git a/libdd-trace-normalization/src/normalize_utils.rs b/libdd-trace-normalization/src/normalize_utils.rs index b70093c817..fab5d43cae 100644 --- a/libdd-trace-normalization/src/normalize_utils.rs +++ b/libdd-trace-normalization/src/normalize_utils.rs @@ -83,6 +83,12 @@ pub fn normalize_parent_id(parent_id: &mut u64, trace_id: u64, span_id: u64) { } } +pub fn normalize_tag_cloned(tag: &str) -> String { + let mut tag = tag.to_owned(); + normalize_tag(&mut tag); + tag +} + pub fn normalize_tag(tag: &mut String) { // Since we know that we're only going to write valid utf8 we can work with the Vec directly let bytes = unsafe { tag.as_mut_vec() }; diff --git a/libdd-trace-normalization/src/normalizer.rs b/libdd-trace-normalization/src/normalizer.rs index 7450dad908..ee5790cbc6 100644 --- a/libdd-trace-normalization/src/normalizer.rs +++ b/libdd-trace-normalization/src/normalizer.rs @@ -35,7 +35,7 @@ pub(crate) fn normalize_span(s: &mut pb::Span) -> anyhow::Result<()> { } if let Some(code) = s.meta.get("http.status_code") { - if !is_valid_status_code(code) { + if !is_valid_http_status_code(code) { s.meta.remove("http.status_code"); } }; @@ -43,7 +43,7 @@ pub(crate) fn normalize_span(s: &mut pb::Span) -> anyhow::Result<()> { Ok(()) } -pub(crate) fn is_valid_status_code(sc: &str) -> bool { +pub fn is_valid_http_status_code(sc: &str) -> bool { if let Ok(code) = sc.parse::() { return (100..600).contains(&code); } @@ -476,11 +476,13 @@ mod tests { #[test] fn test_is_valid_status_code() { - assert!(normalizer::is_valid_status_code("100")); - assert!(normalizer::is_valid_status_code("599")); - assert!(!normalizer::is_valid_status_code("99")); - assert!(!normalizer::is_valid_status_code("600")); - assert!(!normalizer::is_valid_status_code("Invalid status code")); + assert!(normalizer::is_valid_http_status_code("100")); + assert!(normalizer::is_valid_http_status_code("599")); + assert!(!normalizer::is_valid_http_status_code("99")); + assert!(!normalizer::is_valid_http_status_code("600")); + assert!(!normalizer::is_valid_http_status_code( + "Invalid status code" + )); } #[test] diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 289593bbb3..8f08c0ac06 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -16,6 +16,7 @@ pub mod send_with_retry; pub mod stats_utils; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; +pub mod trace_filter; pub mod trace_utils; pub mod tracer_header_tags; pub mod tracer_metadata; diff --git a/libdd-trace-utils/src/span/trace_utils.rs b/libdd-trace-utils/src/span/trace_utils.rs index 00faad9516..54699a484e 100644 --- a/libdd-trace-utils/src/span/trace_utils.rs +++ b/libdd-trace-utils/src/span/trace_utils.rs @@ -3,8 +3,10 @@ //! Trace-utils functionalities implementation for tinybytes based spans +use tracing::debug; + use super::{v04::Span, SpanText, TraceData}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; /// Span metric the mini agent must set for the backend to recognize top level span const TOP_LEVEL_KEY: &str = "_top_level"; @@ -56,6 +58,49 @@ where } } +pub fn get_root_span_index(trace: &[Span]) -> anyhow::Result +where + T: TraceData, +{ + if trace.is_empty() { + anyhow::bail!("Cannot find root span index in an empty trace."); + } + + // Do a first pass to find if we have an obvious root span (starting from the end) since some + // clients put the root span last. + for (i, span) in trace.iter().enumerate().rev() { + if span.parent_id == 0 { + return Ok(i); + } + } + + let span_ids: HashSet<_> = trace.iter().map(|span| span.span_id).collect(); + + let mut root_span_id = None; + for (i, span) in trace.iter().enumerate() { + // If a span's parent is not in the trace, it is a root + if !span_ids.contains(&span.parent_id) { + if root_span_id.is_some() { + debug!( + trace_id = &trace[0].trace_id, + "trace has multiple root spans" + ); + } + root_span_id = Some(i); + } + } + Ok(match root_span_id { + Some(i) => i, + None => { + debug!( + trace_id = &trace[0].trace_id, + "Could not find the root span for trace" + ); + trace.len() - 1 + } + }) +} + /// Return true if the span has a top level key set pub fn has_top_level(span: &Span) -> bool { span.metrics @@ -80,9 +125,10 @@ pub fn is_partial_snapshot(span: &Span) -> bool { .is_some_and(|v| *v >= 0.0) } -pub struct DroppedP0Stats { +pub struct DroppedStats { pub dropped_p0_traces: usize, pub dropped_p0_spans: usize, + pub dropped_by_trace_filter: usize, } // Keys used for sampling @@ -100,7 +146,7 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; /// /// # Trace-level attributes /// Some attributes related to the whole trace are stored in the root span of the chunk. -pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats +pub fn drop_chunks(traces: &mut Vec>>) -> DroppedStats where T: TraceData, { @@ -151,9 +197,10 @@ where true }); - DroppedP0Stats { + DroppedStats { dropped_p0_traces, dropped_p0_spans, + dropped_by_trace_filter: 0, } } diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 7de186baec..528db31723 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use std::time::Duration; const TEST_AGENT_IMAGE_NAME: &str = "ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent"; -const TEST_AGENT_IMAGE_TAG: &str = "v1.56.0"; +const TEST_AGENT_IMAGE_TAG: &str = "v1.61.1"; const TEST_AGENT_READY_MSG: &str = "INFO:ddapm_test_agent.agent:Trace request stall seconds setting set to 0.0."; diff --git a/libdd-trace-utils/src/trace_filter.rs b/libdd-trace-utils/src/trace_filter.rs new file mode 100644 index 0000000000..235c4e3903 --- /dev/null +++ b/libdd-trace-utils/src/trace_filter.rs @@ -0,0 +1,663 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +//! Trace-level filter logic for client-side stats (filter_tags, filter_tags_regex, +//! ignore_resources as published by the agent's /info endpoint). +use std::borrow::Borrow as _; + +use libdd_common::regex_engine::Regex; +use libdd_trace_normalization::{normalize_utils, normalizer}; +use tracing::{debug, error}; + +use crate::span::{self, trace_utils::get_root_span_index, TraceData}; + +trait TagFilter { + /// Returns true if the given tag value matches the Filterer. + fn matches_tag_value(&self, value: &str) -> bool; + // Getter to the key field + fn key(&self) -> &str; +} + +#[derive(Debug)] +struct TagLiteralFilter { + key: String, + value: Option, +} + +#[derive(Debug)] +struct TagRegexFilter { + key: String, + value: Option, +} + +/// Applies trace-level filters derived from the agent's `/info` endpoint configuration: +/// `filter_tags`, `filter_tags_regex`, and `ignore_resources`. +/// +/// Filtering is evaluated on the root span of each trace. +#[derive(Debug, Default)] +pub struct TraceFilterer { + reject: Vec, + reject_regex: Vec, + + require: Vec, + require_regex: Vec, + + ignore_resources: Vec, +} + +/// Minimal span interface required by [`TraceFilterer`]. +pub trait Span<'a> { + /// Returns the normalized resource value + fn resource_normalized(&'a self) -> &'a str; + /// Returns the value of the given meta tag, if present. + fn get_meta(&'a self, key: &str) -> Option<&'a str>; +} + +impl TagFilter for TagLiteralFilter { + fn matches_tag_value(&self, value: &str) -> bool { + match &self.value { + None => true, // No value requirement => Any value is a match + Some(required_value) => value == required_value, + } + } + + fn key(&self) -> &str { + &self.key + } +} + +impl TagFilter for TagRegexFilter { + fn matches_tag_value(&self, value: &str) -> bool { + match &self.value { + None => true, // No value requirement => Any value is a match + Some(pattern) => pattern.is_match(value), + } + } + + fn key(&self) -> &str { + &self.key + } +} + +impl<'a, T: TraceData> Span<'a> for span::v04::Span { + fn resource_normalized(&'a self) -> &'a str { + // Normalization + let span_resource = self.resource.borrow(); + if span_resource.is_empty() { + let span_name = self.name.borrow(); + debug!( + ?span_name, + "Trace filter: filtering on name because resource is empty" + ); + span_name + } else { + span_resource + } + } + + fn get_meta(&'a self, key: &str) -> Option<&'a str> { + self.meta.get(key).map(|v| v.borrow()) + } +} + +impl TraceFilterer { + fn compile_literal_filters(filters: &[String]) -> Vec { + let mut tag_regex_filters = Vec::new(); + for filter in filters { + let (key, value) = match filter.split_once(":") { + Some((key, value)) if !value.trim().is_empty() => { + (key.trim(), Some(value.trim().to_owned())) + } + _ => (filter.trim(), None), + }; + if key.is_empty() { + error!( + ?filter, + "Invalid tag filter with empty key value, skipping it" + ); + continue; + } + + tag_regex_filters.push(TagLiteralFilter { + key: key.to_owned(), + value, + }); + } + + tag_regex_filters + } + + fn compile_regex_filters(filters: &[String]) -> Vec { + let mut tag_regex_filters = Vec::new(); + for filter in filters { + let (key, value) = match filter.split_once(":") { + Some((key, value)) if !value.trim().is_empty() => (key.trim(), Some(value.trim())), + _ => (filter.trim(), None), + }; + if key.is_empty() { + error!( + ?filter, + "Invalid tag filter with empty key value, skipping it" + ); + continue; + } + + let value = match value { + Some(value) => match Regex::new(value) { + Ok(regex) => Some(regex), + Err(err) => { + error!( + ?filter, + ?err, + "Invalid regex pattern in tag filter's value, skipping it" + ); + continue; + } + }, + None => None, + }; + + tag_regex_filters.push(TagRegexFilter { + key: key.to_owned(), + value, + }); + } + + tag_regex_filters + } + + fn compile_resource_filters(ignore_resources: &[String]) -> Vec { + ignore_resources + .iter() + .filter_map(|regex| { + Regex::new(regex) + .inspect_err(|err| { + error!( + ?regex, + ?err, + "Invalid regex pattern in ignore resources filter, skipping it" + ) + }) + .ok() + }) + .collect() + } + + /// Creates a new filterer from the agent's `/info` configuration fields. + /// + /// Invalid regex patterns are logged and skipped rather than causing an error. + pub fn new( + filter_tags_require: &[String], + filter_tags_reject: &[String], + filter_tags_regex_require: &[String], + filter_tags_regex_reject: &[String], + ignore_resources: &[String], + ) -> Self { + let require_regex = Self::compile_regex_filters(filter_tags_regex_require); + let reject_regex = Self::compile_regex_filters(filter_tags_regex_reject); + let require = Self::compile_literal_filters(filter_tags_require); + let reject = Self::compile_literal_filters(filter_tags_reject); + let ignore_resources = Self::compile_resource_filters(ignore_resources); + + Self { + reject, + require, + reject_regex, + require_regex, + ignore_resources, + } + } + /// Creates a no-op filterer that keeps all traces. + pub fn with_empty_conf() -> Self { + Self::default() + } + + /// Removes traces that fail filter checks in-place. Returns the number of traces dropped. + pub fn filter_traces(&self, traces: &mut Vec>>) -> usize { + let traces_count_before = traces.len(); + traces.retain(|trace| { + let Ok(root_span_index) = get_root_span_index(trace) else { + return true; + }; + let should_drop = self.should_drop(&trace[root_span_index]); + if should_drop { + debug!("Trace rejected as it fails to meet tag requirements. root: %v"); + } + !should_drop + }); + let traces_count_after = traces.len(); + + traces_count_before - traces_count_after + } + + /// Checks if the trace with root span `root_span` should be dropped based on filter + /// configuration. + /// + /// Applies a subset of trace normalization logic from `libdd-trace-normalization` before + /// checking. + // 1. Resource filtering: If the root span's resource name matches any pattern in + // ignore_resources, reject the trace. + // 2. Reject filtering: If any tag on the root span matches filters in filter_tags.reject or + // filter_tags_regex.reject, reject the trace. + // 3. Require filtering: If filter_tags.require or filter_tags_regex.require contain any + // filters, all of them must match tags on the root span. If any required filter doesn't + // match, reject the trace. + pub fn should_drop<'a>(&self, root_span: &'a impl Span<'a>) -> bool { + if !self.ignore_resources.is_empty() { + let span_resource = root_span.resource_normalized(); + + if self + .ignore_resources + .iter() + .any(|resource_pattern| resource_pattern.is_match(span_resource)) + { + return true; + } + } + + if self + .reject + .iter() + .any(|filter| Self::check_tag_filter_with_normalization(filter, root_span)) + { + return true; + } + + if self + .reject_regex + .iter() + .any(|filter| Self::check_tag_filter_with_normalization(filter, root_span)) + { + return true; + } + + if !self + .require + .iter() + .all(|filter| Self::check_tag_filter_with_normalization(filter, root_span)) + { + return true; + } + + if !self + .require_regex + .iter() + .all(|filter| Self::check_tag_filter_with_normalization(filter, root_span)) + { + return true; + } + + false + } + + fn check_tag_filter_with_normalization<'a>( + filter: &impl TagFilter, + root_span: &'a impl Span<'a>, + ) -> bool { + let Some(value) = root_span.get_meta(filter.key()) else { + return false; + }; + match filter.key() { + "env" => { + let normalized_value = normalize_utils::normalize_tag_cloned(value); + filter.matches_tag_value(&normalized_value) + } + "http.status_code" => { + if !normalizer::is_valid_http_status_code(value) { + debug!(?value,"trace filter on http.status_code ignored because root span's `http.status_code` is invalid"); + return false; + } + filter.matches_tag_value(value) + } + _ => filter.matches_tag_value(value), + } + } +} + +#[cfg(test)] +mod tests { + use super::TraceFilterer; + use crate::span::v04::{SpanBytes, VecMap}; + // ---- helpers ---- + + fn span_with(resource: &'static str, meta: &[(&'static str, &'static str)]) -> SpanBytes { + SpanBytes { + service: "svc".into(), + name: "op".into(), + resource: resource.into(), + span_id: 1, + trace_id: 1, + parent_id: 0, + meta: meta + .iter() + .map(|(k, v)| ((*k).into(), (*v).into())) + .collect::>(), + ..Default::default() + } + } + + fn one_trace(s: SpanBytes) -> Vec> { + vec![vec![s]] + } + + fn map_to_owned(values: &[&str]) -> Vec { + values.iter().map(|&s| s.to_owned()).collect() + } + + fn require_str(tags: &[&str]) -> TraceFilterer { + TraceFilterer::new(&map_to_owned(tags), &[], &[], &[], &[]) + } + + fn reject_str(tags: &[&str]) -> TraceFilterer { + TraceFilterer::new(&[], &map_to_owned(tags), &[], &[], &[]) + } + + fn require_regex(tags: &[&str]) -> TraceFilterer { + TraceFilterer::new(&[], &[], &map_to_owned(tags), &[], &[]) + } + + fn reject_regex(tags: &[&str]) -> TraceFilterer { + TraceFilterer::new(&[], &[], &[], &map_to_owned(tags), &[]) + } + + fn ignore_resources(patterns: &[&str]) -> TraceFilterer { + TraceFilterer::new(&[], &[], &[], &[], &map_to_owned(patterns)) + } + + // ---- reject (TagStringFilter) ---- + + #[test] + fn reject_string_exact_match_drops() { + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + reject_str(&["env:prod"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn reject_string_wrong_value_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "staging")])); + reject_str(&["env:prod"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn reject_string_missing_tag_keeps() { + let mut traces = one_trace(span_with("r", &[])); + reject_str(&["env:prod"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn reject_string_key_only_matches_any_value() { + // A key-only filter (no `:value` part) matches regardless of the tag's value. + let mut traces = one_trace(span_with("r", &[("env", "anything")])); + reject_str(&["env"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- reject_regex (TagRegexFilter – literal key, regex value) ---- + + #[test] + fn reject_regex_value_match_drops() { + let mut traces = one_trace(span_with("r", &[("env", "production")])); + reject_regex(&["env:prod.*"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn reject_regex_value_no_match_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "staging")])); + reject_regex(&["env:prod.*"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + // ---- reject_key_regex ---- + // Checks that it's not implemented + + #[test] + fn reject_key_regex_key_and_value_match_drops() { + let mut traces = one_trace(span_with("r", &[("error", "timeout")])); + reject_regex(&["err.*:timeout"]).filter_traces(&mut traces); + // Regex keys are not implemented so it doesn't match + assert!(!traces.is_empty()); + } + + #[test] + fn reject_key_regex_wrong_value_keeps() { + let mut traces = one_trace(span_with("r", &[("error", "network")])); + reject_regex(&["err.*:timeout"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn reject_key_regex_missing_key_keeps() { + let mut traces = one_trace(span_with("r", &[])); + reject_regex(&["err.*:timeout"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + // ---- require (TagStringFilter) ---- + + #[test] + fn require_string_present_and_matching_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + require_str(&["env:prod"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn require_string_missing_tag_drops() { + let mut traces = one_trace(span_with("r", &[])); + require_str(&["env:prod"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn require_string_wrong_value_drops() { + let mut traces = one_trace(span_with("r", &[("env", "staging")])); + require_str(&["env:prod"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- require_regex (TagRegexFilter – literal key, regex value) ---- + + #[test] + fn require_regex_value_match_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "production")])); + require_regex(&["env:prod.*"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn require_regex_missing_drops() { + let mut traces = one_trace(span_with("r", &[])); + require_regex(&["env:prod.*"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- require_key_regex ---- + // (Checks that it's not implemented) + + #[test] + fn require_key_regex_key_exists_keeps() { + let mut traces = one_trace(span_with("r", &[("error", "any")])); + require_regex(&["err.*"]).filter_traces(&mut traces); + // Regex keys are not implemented so it doesn't match + assert!(traces.is_empty()); + } + + #[test] + fn require_key_regex_missing_key_drops() { + let mut traces = one_trace(span_with("r", &[])); + require_regex(&["err.*"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- ignore_resources ---- + + #[test] + fn ignore_resources_match_drops() { + let mut traces = one_trace(span_with("GET /health", &[])); + ignore_resources(&["GET /health"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn ignore_resources_no_match_keeps() { + let mut traces = one_trace(span_with("POST /data", &[])); + ignore_resources(&["GET /health"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn ignore_resources_empty_resource_falls_back_to_name() { + // When resource is empty the span's name field is used for matching. + // The helper sets name = "op", so ignore_resources("op") must drop it. + let mut traces = one_trace(span_with("", &[])); + ignore_resources(&["op"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- env tag normalization ---- + + #[test] + fn env_normalization_reject_matches_after_lowercase() { + // normalize_tag_cloned("PROD") == "prod"; the reject filter "env:prod" must fire. + let mut traces = one_trace(span_with("r", &[("env", "PROD")])); + reject_str(&["env:prod"]).filter_traces(&mut traces); + assert!( + traces.is_empty(), + "env value should be normalized before matching" + ); + } + + #[test] + fn env_normalization_require_matches_normalized_value() { + // normalize_tag_cloned("Prod Env") == "prod_env" (uppercase + space → underscore). + let mut traces = one_trace(span_with("r", &[("env", "Prod Env")])); + require_str(&["env:prod_env"]).filter_traces(&mut traces); + assert_eq!( + traces.len(), + 1, + "normalized env should satisfy the require filter" + ); + } + + // ---- http.status_code special handling ---- + + #[test] + fn http_status_code_invalid_value_skips_reject_filter() { + // is_valid_http_status_code("abc") == false → check_tag_filter returns false + // → reject never fires → trace kept even though the raw value equals the filter. + let mut traces = one_trace(span_with("r", &[("http.status_code", "abc")])); + reject_str(&["http.status_code:abc"]).filter_traces(&mut traces); + assert_eq!( + traces.len(), + 1, + "invalid status code should not trigger the filter" + ); + } + + #[test] + fn http_status_code_valid_value_triggers_reject_filter() { + let mut traces = one_trace(span_with("r", &[("http.status_code", "500")])); + reject_str(&["http.status_code:500"]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + // ---- edge / misc ---- + + #[test] + fn multiple_traces_partial_rejection() { + let f = reject_str(&["env:prod"]); + let mut traces = vec![ + vec![span_with("r", &[("env", "prod")])], // dropped + vec![span_with("r", &[("env", "staging")])], // kept + ]; + f.filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn no_filters_keeps_all_traces() { + let f = TraceFilterer::new(&[], &[], &[], &[], &[]); + let mut traces = vec![ + vec![span_with("r1", &[])], + vec![span_with("r2", &[("env", "prod")])], + ]; + f.filter_traces(&mut traces); + assert_eq!(traces.len(), 2); + } + + #[test] + fn invalid_regex_in_filter_is_skipped_gracefully() { + // A bad regex pattern is silently discarded; no panic, trace is kept. + let f = reject_regex(&["env:[invalid"]); + let mut traces = one_trace(span_with("r", &[("env", "anything")])); + f.filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + // ---- key/value trimming ---- + + #[test] + fn literal_reject_spaces_around_colon_drops() { + // " env : prod " → key="env", value="prod" + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + reject_str(&[" env : prod "]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn literal_require_spaces_around_colon_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + require_str(&[" env : prod "]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn literal_reject_key_only_with_spaces_drops_any_value() { + // " env " (no colon) → key="env", no value requirement + let mut traces = one_trace(span_with("r", &[("env", "anything")])); + reject_str(&[" env "]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn literal_reject_empty_key_is_skipped_keeps() { + // ":prod" → key="" → filter skipped → trace kept + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + reject_str(&[":prod"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn literal_require_empty_key_is_skipped_keeps() { + // ":prod" → filter skipped → require list empty → vacuous all() → trace kept + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + require_str(&[":prod"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn regex_reject_spaces_around_colon_drops() { + // " env : prod.* " → key="env", regex="prod.*" + let mut traces = one_trace(span_with("r", &[("env", "production")])); + reject_regex(&[" env : prod.* "]).filter_traces(&mut traces); + assert!(traces.is_empty()); + } + + #[test] + fn regex_require_spaces_around_colon_keeps() { + let mut traces = one_trace(span_with("r", &[("env", "production")])); + require_regex(&[" env : prod.* "]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } + + #[test] + fn regex_reject_empty_key_is_skipped_keeps() { + // ":prod.*" → key="" → filter skipped → trace kept + let mut traces = one_trace(span_with("r", &[("env", "prod")])); + reject_regex(&[":prod.*"]).filter_traces(&mut traces); + assert_eq!(traces.len(), 1); + } +} diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index dd0099b365..851ac54beb 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -381,10 +381,7 @@ pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result { } } - let mut span_ids: HashSet = HashSet::with_capacity(trace.len()); - for span in trace.iter() { - span_ids.insert(span.span_id); - } + let span_ids: HashSet<_> = trace.iter().map(|span| span.span_id).collect(); let mut root_span_id = None; for (i, span) in trace.iter().enumerate() {