diff --git a/include/datadog/telemetry/telemetry.h b/include/datadog/telemetry/telemetry.h index 01574f888..725abed31 100644 --- a/include/datadog/telemetry/telemetry.h +++ b/include/datadog/telemetry/telemetry.h @@ -44,6 +44,25 @@ void init(FinalizedConfiguration configuration, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); +/// Deterministically shut down the telemetry module. +/// +/// Cancels all scheduled tasks, sends the app-closing payload, and waits up +/// to 2 seconds for in-flight requests to complete (HTTPClient::drain()). +/// Then releases the HTTP client reference; if this is the last shared_ptr +/// to the client, the background Curl thread is joined synchronously and any +/// remaining in-flight requests are abandoned without firing their callbacks. +/// If other holders of the client remain, the Curl thread continues running +/// and requests may still complete and fire their callbacks after this call +/// returns (the callbacks guard against this with weak_from_this()). After +/// this call all telemetry send functions become no-ops. Safe to call even if +/// telemetry was never initialized, but must be called at most once after a +/// successful init. +/// +/// Call this from the worker process exit path (e.g. before destroying the +/// tracer) so that telemetry's reference to the HTTP client is released +/// promptly, reducing the window before the Curl thread is quiesced. +void shutdown(); + /// Sends configuration changes. /// /// This function is responsible for sending reported configuration changes diff --git a/src/datadog/telemetry/telemetry.cpp b/src/datadog/telemetry/telemetry.cpp index 3cd515459..67b0c844c 100644 --- a/src/datadog/telemetry/telemetry.cpp +++ b/src/datadog/telemetry/telemetry.cpp @@ -22,7 +22,7 @@ using NoopTelemetry = std::monostate; /// `TelemetryProxy` holds either the real implementation or a no-op /// implementation. -using TelemetryProxy = std::variant; +using TelemetryProxy = std::variant>; /// NOTE(@dmehala): Here to facilitate Meyer's singleton construction. struct Ctor_param final { @@ -37,9 +37,9 @@ struct Ctor_param final { TelemetryProxy make_telemetry(const tracing::Optional& init) { if (!init || !init->configuration.enabled) return NoopTelemetry{}; - return Telemetry{init->configuration, init->tracer_signature, init->logger, - init->client, init->scheduler, init->agent_url, - init->clock}; + return Telemetry::create(init->configuration, init->tracer_signature, + init->logger, init->client, init->scheduler, + init->agent_url, init->clock); } TelemetryProxy& instance( @@ -69,20 +69,30 @@ void init(FinalizedConfiguration configuration, event_scheduler, agent_url, clock}); } -void send_configuration_change() { +void shutdown() { std::visit( details::Overload{ - [&](Telemetry& telemetry) { telemetry.send_configuration_change(); }, + [](std::shared_ptr& telemetry) { telemetry->shutdown(); }, [](NoopTelemetry) {}, }, instance()); } +void send_configuration_change() { + std::visit(details::Overload{ + [&](std::shared_ptr& telemetry) { + telemetry->send_configuration_change(); + }, + [](NoopTelemetry) {}, + }, + instance()); +} + void capture_configuration_change( const std::vector& new_configuration) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.capture_configuration_change(new_configuration); + [&](std::shared_ptr& telemetry) { + telemetry->capture_configuration_change(new_configuration); }, [](NoopTelemetry) {}, }, @@ -92,7 +102,9 @@ void capture_configuration_change( namespace log { void warning(std::string message) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { telemetry.log_warning(message); }, + [&](std::shared_ptr& telemetry) { + telemetry->log_warning(message); + }, [](NoopTelemetry) {}, }, instance()); @@ -100,7 +112,9 @@ void warning(std::string message) { void error(std::string message) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { telemetry.log_error(message); }, + [&](std::shared_ptr& telemetry) { + telemetry->log_error(message); + }, [](NoopTelemetry) {}, }, instance()); @@ -108,8 +122,8 @@ void error(std::string message) { void error(std::string message, std::string stacktrace) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.log_error(message, stacktrace); + [&](std::shared_ptr& telemetry) { + telemetry->log_error(message, stacktrace); }, [](auto&&) {}, }, @@ -119,18 +133,19 @@ void error(std::string message, std::string stacktrace) { namespace counter { void increment(const Counter& counter) { - std::visit( - details::Overload{ - [&](Telemetry& telemetry) { telemetry.increment_counter(counter); }, - [](auto&&) {}, - }, - instance()); + std::visit(details::Overload{ + [&](std::shared_ptr& telemetry) { + telemetry->increment_counter(counter); + }, + [](auto&&) {}, + }, + instance()); } void increment(const Counter& counter, const std::vector& tags) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.increment_counter(counter, tags); + [&](std::shared_ptr& telemetry) { + telemetry->increment_counter(counter, tags); }, [](auto&&) {}, }, @@ -138,18 +153,19 @@ void increment(const Counter& counter, const std::vector& tags) { } void decrement(const Counter& counter) { - std::visit( - details::Overload{ - [&](Telemetry& telemetry) { telemetry.decrement_counter(counter); }, - [](auto&&) {}, - }, - instance()); + std::visit(details::Overload{ + [&](std::shared_ptr& telemetry) { + telemetry->decrement_counter(counter); + }, + [](auto&&) {}, + }, + instance()); } void decrement(const Counter& counter, const std::vector& tags) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.decrement_counter(counter, tags); + [&](std::shared_ptr& telemetry) { + telemetry->decrement_counter(counter, tags); }, [](auto&&) {}, }, @@ -157,19 +173,20 @@ void decrement(const Counter& counter, const std::vector& tags) { } void set(const Counter& counter, uint64_t value) { - std::visit( - details::Overload{ - [&](Telemetry& telemetry) { telemetry.set_counter(counter, value); }, - [](auto&&) {}, - }, - instance()); + std::visit(details::Overload{ + [&](std::shared_ptr& telemetry) { + telemetry->set_counter(counter, value); + }, + [](auto&&) {}, + }, + instance()); } void set(const Counter& counter, const std::vector& tags, uint64_t value) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.set_counter(counter, tags, value); + [&](std::shared_ptr& telemetry) { + telemetry->set_counter(counter, tags, value); }, [](auto&&) {}, }, @@ -181,7 +198,9 @@ void set(const Counter& counter, const std::vector& tags, namespace rate { void set(const Rate& rate, uint64_t value) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { telemetry.set_rate(rate, value); }, + [&](std::shared_ptr& telemetry) { + telemetry->set_rate(rate, value); + }, [](auto&&) {}, }, instance()); @@ -189,12 +208,13 @@ void set(const Rate& rate, uint64_t value) { void set(const Rate& rate, const std::vector& tags, uint64_t value) { - std::visit( - details::Overload{ - [&](Telemetry& telemetry) { telemetry.set_rate(rate, tags, value); }, - [](auto&&) {}, - }, - instance()); + std::visit(details::Overload{ + [&](std::shared_ptr& telemetry) { + telemetry->set_rate(rate, tags, value); + }, + [](auto&&) {}, + }, + instance()); } } // namespace rate @@ -202,8 +222,8 @@ namespace distribution { void add(const Distribution& distribution, uint64_t value) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.add_datapoint(distribution, value); + [&](std::shared_ptr& telemetry) { + telemetry->add_datapoint(distribution, value); }, [](auto&&) {}, }, @@ -213,8 +233,8 @@ void add(const Distribution& distribution, uint64_t value) { void add(const Distribution& distribution, const std::vector& tags, uint64_t value) { std::visit(details::Overload{ - [&](Telemetry& telemetry) { - telemetry.add_datapoint(distribution, tags, value); + [&](std::shared_ptr& telemetry) { + telemetry->add_datapoint(distribution, tags, value); }, [](auto&&) {}, }, diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index 09ec61a3b..9f1e99143 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -222,24 +222,46 @@ Telemetry::Telemetry(FinalizedConfiguration config, http_client_(client), clock_(std::move(clock)), scheduler_(event_scheduler), - host_info_(get_host_info()) { - app_started(); - schedule_tasks(); + host_info_(get_host_info()) {} + +std::shared_ptr Telemetry::create( + FinalizedConfiguration config, TracerSignature tracer_signature, + std::shared_ptr logger, + std::shared_ptr client, + std::shared_ptr event_scheduler, + HTTPClient::URL agent_url, Clock clock) { + std::shared_ptr t(new Telemetry( + std::move(config), std::move(tracer_signature), std::move(logger), + std::move(client), std::move(event_scheduler), std::move(agent_url), + std::move(clock))); + t->app_started(); + t->schedule_tasks(); + return t; } void Telemetry::schedule_tasks() { tasks_.emplace_back(scheduler_->schedule_recurring_event( - config_.heartbeat_interval, - [this]() { send_payload("app-heartbeat", heartbeat_and_telemetry()); })); + config_.heartbeat_interval, [weak = weak_from_this()]() { + if (auto self = weak.lock()) { + self->send_payload("app-heartbeat", self->heartbeat_and_telemetry()); + } + })); if (config_.report_metrics) { tasks_.emplace_back(scheduler_->schedule_recurring_event( - config_.metrics_interval, [this]() mutable { capture_metrics(); })); + config_.metrics_interval, [weak = weak_from_this()]() { + if (auto self = weak.lock()) { + self->capture_metrics(); + } + })); } tasks_.emplace_back(scheduler_->schedule_recurring_event( - config_.extended_heartbeat_interval, [this]() { - send_payload("app-extended-heartbeat", extended_heartbeat_payload()); + config_.extended_heartbeat_interval, [weak = weak_from_this()]() { + if (auto self = weak.lock()) { + self->send_payload("app-extended-heartbeat", + self->extended_heartbeat_payload()); + } })); } @@ -250,52 +272,6 @@ Telemetry::~Telemetry() { } } -Telemetry::Telemetry(Telemetry&& rhs) - : config_(std::move(rhs.config_)), - logger_(std::move(rhs.logger_)), - telemetry_endpoint_(std::move(rhs.telemetry_endpoint_)), - tracer_signature_(std::move(rhs.tracer_signature_)), - http_client_(rhs.http_client_), - clock_(std::move(rhs.clock_)), - scheduler_(std::move(rhs.scheduler_)), - counters_(std::move(rhs.counters_)), - counters_snapshot_(std::move(rhs.counters_snapshot_)), - rates_(std::move(rhs.rates_)), - rates_snapshot_(std::move(rhs.rates_snapshot_)), - distributions_(std::move(rhs.distributions_)), - seq_id_(rhs.seq_id_), - config_seq_ids_(rhs.config_seq_ids_), - all_configurations_(rhs.all_configurations_), - host_info_(rhs.host_info_) { - cancel_tasks(rhs.tasks_); - schedule_tasks(); -} - -Telemetry& Telemetry::operator=(Telemetry&& rhs) { - if (&rhs != this) { - cancel_tasks(rhs.tasks_); - std::swap(config_, rhs.config_); - std::swap(logger_, rhs.logger_); - std::swap(telemetry_endpoint_, rhs.telemetry_endpoint_); - std::swap(http_client_, rhs.http_client_); - std::swap(tracer_signature_, rhs.tracer_signature_); - std::swap(http_client_, rhs.http_client_); - std::swap(clock_, rhs.clock_); - std::swap(scheduler_, rhs.scheduler_); - std::swap(counters_, rhs.counters_); - std::swap(counters_snapshot_, rhs.counters_snapshot_); - std::swap(rates_, rhs.rates_); - std::swap(rates_snapshot_, rhs.rates_snapshot_); - std::swap(distributions_, rhs.distributions_); - std::swap(seq_id_, rhs.seq_id_); - std::swap(config_seq_ids_, rhs.config_seq_ids_); - std::swap(all_configurations_, rhs.all_configurations_); - std::swap(host_info_, rhs.host_info_); - schedule_tasks(); - } - return *this; -} - void Telemetry::log_error(std::string message) { if (!config_.report_logs) return; increment_counter(internal_metrics::logs_created, {"level:error"}); @@ -364,6 +340,14 @@ void Telemetry::app_started() { } } +void Telemetry::shutdown() { + // cancel_tasks clears tasks_, so ~Telemetry() becomes a no-op after this. + cancel_tasks(tasks_); + app_closing(); + std::lock_guard l{http_client_mutex_}; + http_client_.reset(); +} + void Telemetry::app_closing() { // Capture metrics in-between two ticks to be sent with the last payload. capture_metrics(); @@ -373,6 +357,13 @@ void Telemetry::app_closing() { } void Telemetry::send_payload(StringView request_type, std::string payload) { + std::shared_ptr client; + { + std::lock_guard l{http_client_mutex_}; + client = http_client_; + } + if (!client) return; + auto set_telemetry_headers = [request_type, payload_size = payload.size(), debug_enabled = config_.debug, &signature = @@ -389,26 +380,27 @@ void Telemetry::send_payload(StringView request_type, std::string payload) { } }; - auto on_response = [this, logger = logger_](int response_status, - const DictReader&, - std::string response_body) { - if (response_status >= 500) { - increment_counter(internal_metrics::responses, - {"status_code:5xx", "endpoint:agent"}); - } else if (response_status >= 400) { - increment_counter(internal_metrics::responses, - {"status_code:4xx", "endpoint:agent"}); - } else if (response_status >= 300) { - increment_counter(internal_metrics::responses, - {"status_code:3xx", "endpoint:agent"}); - } else if (response_status >= 200) { - increment_counter(internal_metrics::responses, - {"status_code:2xx", "endpoint:agent"}); - } else if (response_status >= 100) { - increment_counter(internal_metrics::responses, - {"status_code:1xx", "endpoint:agent"}); + auto on_response = [weak = weak_from_this(), logger = logger_]( + int response_status, const DictReader&, + std::string response_body) { + if (auto self = weak.lock()) { + if (response_status >= 500) { + self->increment_counter(internal_metrics::responses, + {"status_code:5xx", "endpoint:agent"}); + } else if (response_status >= 400) { + self->increment_counter(internal_metrics::responses, + {"status_code:4xx", "endpoint:agent"}); + } else if (response_status >= 300) { + self->increment_counter(internal_metrics::responses, + {"status_code:3xx", "endpoint:agent"}); + } else if (response_status >= 200) { + self->increment_counter(internal_metrics::responses, + {"status_code:2xx", "endpoint:agent"}); + } else if (response_status >= 100) { + self->increment_counter(internal_metrics::responses, + {"status_code:1xx", "endpoint:agent"}); + } } - if (response_status < 200 || response_status >= 300) { logger->log_error([&](auto& stream) { stream << "Unexpected telemetry response status " << response_status @@ -419,9 +411,11 @@ void Telemetry::send_payload(StringView request_type, std::string payload) { }; // Callback for unsuccessful telemetry HTTP requests. - auto on_error = [this, logger = logger_](Error error) { - increment_counter(internal_metrics::errors, - {"type:network", "endpoint:agent"}); + auto on_error = [weak = weak_from_this(), logger = logger_](Error error) { + if (auto self = weak.lock()) { + self->increment_counter(internal_metrics::errors, + {"type:network", "endpoint:agent"}); + } logger->log_error(error.with_prefix( "Error occurred during HTTP request for telemetry: ")); }; @@ -431,9 +425,9 @@ void Telemetry::send_payload(StringView request_type, std::string payload) { payload.size()); auto post_result = - http_client_->post(telemetry_endpoint_, set_telemetry_headers, - std::move(payload), std::move(on_response), - std::move(on_error), clock_().tick + request_timeout); + client->post(telemetry_endpoint_, set_telemetry_headers, + std::move(payload), std::move(on_response), + std::move(on_error), clock_().tick + request_timeout); if (auto* error = post_result.if_error()) { increment_counter(internal_metrics::errors, {"type:network", "endpoint:agent"}); diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index d916846cb..59503565e 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -9,6 +9,7 @@ #include #include +#include #include #include "json.hpp" @@ -28,7 +29,7 @@ using MetricSnapshot = std::vector>; /// indeed a bottleneck, I'll embrace KISS principle. However, in a future /// iteration we could use multiple producer single consumer queue or /// lock-free queue. -class Telemetry final { +class Telemetry final : public std::enable_shared_from_this { /// Configuration object containing the validated settings for telemetry FinalizedConfiguration config_; /// Shared pointer to the user logger instance. @@ -72,12 +73,21 @@ class Telemetry final { tracing::HostInfo host_info_; + std::mutex http_client_mutex_; + public: - /// Constructor for the Telemetry class - /// - /// @param configuration The finalized configuration settings. - /// @param logger User logger instance. - /// @param metrics A vector user metrics to report. + static std::shared_ptr create( + FinalizedConfiguration configuration, + tracing::TracerSignature tracer_signature, + std::shared_ptr logger, + std::shared_ptr client, + std::shared_ptr event_scheduler, + tracing::HTTPClient::URL agent_url, + tracing::Clock clock = tracing::default_clock); + + ~Telemetry(); + + private: Telemetry(FinalizedConfiguration configuration, tracing::TracerSignature tracer_signature, std::shared_ptr logger, @@ -86,15 +96,7 @@ class Telemetry final { tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); - /// Destructor - /// - /// Send last metrics snapshot and `app-closing` event. - ~Telemetry(); - - /// Move semantics. - Telemetry(Telemetry&& rhs); - Telemetry& operator=(Telemetry&&); - + public: /// Capture and report internal error message to Datadog. /// /// @param message The error message. @@ -111,6 +113,12 @@ class Telemetry final { void capture_configuration_change( const std::vector& new_configuration); + // Deterministic shutdown: cancels scheduled tasks, sends the app-closing + // payload, drains in-flight HTTP requests, and releases the HTTP client + // (joining the background thread if this is the last reference). + // After this call the Telemetry object is inert and safe to destroy. + void shutdown(); + /// Counter void increment_counter(const Counter& counter); void increment_counter(const Counter& counter, diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 80db9e976..9119892c7 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -111,12 +111,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("ctor send app-started message") { SECTION("Without a defined integration") { - Telemetry telemetry{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); /// By default the integration is `datadog` with the tracer version. /// TODO: remove the default because these datadog are already part of the /// request header. @@ -136,12 +132,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.integration_name = "nginx"; cfg.integration_version = "1.25.2"; - Telemetry telemetry2{*finalize_config(cfg), - tracer_signature, - logger, - client, - scheduler, - *url}; + auto telemetry2 = + Telemetry::create(*finalize_config(cfg), tracer_signature, logger, + client, scheduler, *url); auto app_started = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(app_started) == true); @@ -166,12 +159,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { ddtest::EnvGuard install_time_env("DD_INSTRUMENTATION_INSTALL_TIME", "1703188212"); - Telemetry telemetry4{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url}; + auto telemetry4 = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); auto app_started = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(app_started) == true); @@ -218,12 +207,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.products.emplace_back(std::move(product)); - Telemetry telemetry3{*finalize_config(cfg), - tracer_signature, - logger, - client, - scheduler, - *url}; + auto telemetry3 = + Telemetry::create(*finalize_config(cfg), tracer_signature, logger, + client, scheduler, *url); auto app_started = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(app_started) == true); @@ -272,7 +258,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("generates a configuration change event") { SECTION("empty configuration do not generate a valid payload") { client->clear(); - telemetry3.send_configuration_change(); + telemetry3->send_configuration_change(); CHECK(client->request_body.empty()); } @@ -285,8 +271,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Error{Error::Code::OTHER, "empty field"}}}; client->clear(); - telemetry3.capture_configuration_change(new_config); - telemetry3.send_configuration_change(); + telemetry3->capture_configuration_change(new_config); + telemetry3->send_configuration_change(); auto updates = client->request_body; REQUIRE(!updates.empty()); @@ -338,7 +324,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { // No update -> no configuration update client->clear(); - telemetry3.send_configuration_change(); + telemetry3->send_configuration_change(); CHECK(client->request_body.empty()); } } @@ -347,12 +333,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("dtor send app-closing message") { { - Telemetry telemetry{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); client->clear(); } @@ -376,8 +358,8 @@ TELEMETRY_IMPLEMENTATION_TEST("session ID headers") { auto session_rid = RuntimeID::generate(); const TracerSignature tracer_signature(session_rid, "testsvc", "test"); - Telemetry telemetry{ - *finalize_config(), tracer_signature, logger, client, scheduler, *url}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); auto it = client->request_headers.items.find("DD-Session-ID"); REQUIRE(it != client->request_headers.items.end()); @@ -393,8 +375,8 @@ TELEMETRY_IMPLEMENTATION_TEST("session ID headers") { const TracerSignature tracer_signature(session_rid, root_rid.string(), "testsvc", "test"); - Telemetry telemetry{ - *finalize_config(), tracer_signature, logger, client, scheduler, *url}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); auto session_it = client->request_headers.items.find("DD-Session-ID"); REQUIRE(session_it != client->request_headers.items.end()); @@ -411,8 +393,8 @@ TELEMETRY_IMPLEMENTATION_TEST("session ID headers") { const TracerSignature tracer_signature(session_rid, root_rid.string(), "testsvc", "test"); - Telemetry telemetry{ - *finalize_config(), tracer_signature, logger, client, scheduler, *url}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url); client->clear(); scheduler->trigger_heartbeat(); @@ -445,13 +427,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto url = HTTPClient::URL::parse("http://localhost:8000"); - Telemetry telemetry{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url, - clock}; + auto telemetry = Telemetry::create(*finalize_config(), tracer_signature, + logger, client, scheduler, *url, clock); SECTION("generates a heartbeat message") { client->clear(); @@ -483,12 +460,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { cfg.products.emplace_back(std::move(product)); auto scheduler2 = std::make_shared(); - Telemetry telemetry2{*finalize_config(cfg), - tracer_signature, - logger, - client, - scheduler2, - *url}; + auto telemetry2 = Telemetry::create(*finalize_config(cfg), tracer_signature, + logger, client, scheduler2, *url); client->clear(); scheduler2->trigger_extended_heartbeat(); @@ -528,18 +501,14 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { cfg.products.emplace_back(std::move(product)); auto scheduler2 = std::make_shared(); - Telemetry telemetry2{*finalize_config(cfg), - tracer_signature, - logger, - client, - scheduler2, - *url}; + auto telemetry2 = Telemetry::create(*finalize_config(cfg), tracer_signature, + logger, client, scheduler2, *url); // Simulate a remote config update overriding SERVICE_NAME - telemetry2.capture_configuration_change( + telemetry2->capture_configuration_change( {{ConfigName::SERVICE_NAME, "rc-service", ConfigMetadata::Origin::REMOTE_CONFIG}}); - telemetry2.send_configuration_change(); + telemetry2->send_configuration_change(); client->clear(); scheduler2->trigger_extended_heartbeat(); @@ -571,18 +540,18 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { /// - can't decrement below zero. -> is that a telemetry requirements? /// - rates or counter reset to zero after capture. const Counter my_counter{"my_counter", "counter-test", true}; - telemetry.increment_counter(my_counter); // = 1 - telemetry.increment_counter(my_counter); // = 2 - telemetry.increment_counter(my_counter); // = 3 - telemetry.decrement_counter(my_counter); // = 2 + telemetry->increment_counter(my_counter); // = 1 + telemetry->increment_counter(my_counter); // = 2 + telemetry->increment_counter(my_counter); // = 3 + telemetry->decrement_counter(my_counter); // = 2 scheduler->trigger_metrics_capture(); - telemetry.increment_counter(my_counter); // = 1 + telemetry->increment_counter(my_counter); // = 1 scheduler->trigger_metrics_capture(); - telemetry.set_counter(my_counter, 42); - telemetry.set_counter(my_counter, {"event:test"}, 100); - telemetry.decrement_counter(my_counter, {"event:test"}); + telemetry->set_counter(my_counter, 42); + telemetry->set_counter(my_counter, {"event:test"}, 100); + telemetry->decrement_counter(my_counter, {"event:test"}); scheduler->trigger_metrics_capture(); // Expect 2 series: @@ -654,9 +623,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { SECTION("counters can't go below zero") { client->clear(); const Counter positive_counter{"positive_counter", "counter-test2", true}; - telemetry.decrement_counter(positive_counter); // = 0 - telemetry.decrement_counter(positive_counter); // = 0 - telemetry.decrement_counter(positive_counter); // = 0 + telemetry->decrement_counter(positive_counter); // = 0 + telemetry->decrement_counter(positive_counter); // = 0 + telemetry->decrement_counter(positive_counter); // = 0 scheduler->trigger_metrics_capture(); scheduler->trigger_heartbeat(); @@ -696,13 +665,13 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { client->clear(); Rate rps{"request", "rate-test", true}; - telemetry.set_rate(rps, 1000); + telemetry->set_rate(rps, 1000); scheduler->trigger_metrics_capture(); - telemetry.set_rate(rps, 2000); - telemetry.set_rate(rps, 5000); - telemetry.set_rate(rps, {"status:2xx"}, 5000); + telemetry->set_rate(rps, 2000); + telemetry->set_rate(rps, 5000); + telemetry->set_rate(rps, {"status:2xx"}, 5000); scheduler->trigger_metrics_capture(); @@ -772,17 +741,17 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { client->clear(); Distribution response_time{"response_time", "dist-test", false}; - telemetry.add_datapoint(response_time, 128); - telemetry.add_datapoint(response_time, 42); - telemetry.add_datapoint(response_time, 3000); + telemetry->add_datapoint(response_time, 128); + telemetry->add_datapoint(response_time, 42); + telemetry->add_datapoint(response_time, 3000); // Add a tag, this will add a new serie to the distribution payload. - telemetry.add_datapoint(response_time, {"status:200", "method:GET"}, - 6530); + telemetry->add_datapoint(response_time, {"status:200", "method:GET"}, + 6530); Distribution request_size{"request_size", "dist-test-2", true}; - telemetry.add_datapoint(request_size, 1843); - telemetry.add_datapoint(request_size, 4135); + telemetry->add_datapoint(request_size, 1843); + telemetry->add_datapoint(request_size, 4135); // Expect 3 series: // - `response_time` without tags: 3 datapoint (128, 42, 3000). @@ -851,16 +820,12 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { const Rate rps{"request", "rate-test", true}; const Counter my_counter{"my_counter", "counter-test", true}; { - Telemetry tmp_telemetry{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url, - clock}; - tmp_telemetry.increment_counter(my_counter); // = 1 - tmp_telemetry.add_datapoint(response_time, 128); - tmp_telemetry.set_rate(rps, 1000); + auto tmp_telemetry = + Telemetry::create(*finalize_config(), tracer_signature, logger, + client, scheduler, *url, clock); + tmp_telemetry->increment_counter(my_counter); // = 1 + tmp_telemetry->add_datapoint(response_time, 128); + tmp_telemetry->set_rate(rps, 1000); client->clear(); } @@ -969,7 +934,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { CAPTURE(test_case.name); client->clear(); - test_case.apply(telemetry, test_case.input, test_case.stacktrace); + test_case.apply(*telemetry, test_case.input, test_case.stacktrace); scheduler->trigger_heartbeat(); auto message_batch = nlohmann::json::parse(client->request_body); @@ -1004,14 +969,10 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { SECTION("dtor sends logs in `app-closing` message") { { - Telemetry tmp_telemetry{*finalize_config(), - tracer_signature, - logger, - client, - scheduler, - *url, - clock}; - tmp_telemetry.log_warning("Be careful!"); + auto tmp_telemetry = + Telemetry::create(*finalize_config(), tracer_signature, logger, + client, scheduler, *url, clock); + tmp_telemetry->log_warning("Be careful!"); client->clear(); } @@ -1056,8 +1017,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, tracer_signature, logger, client, scheduler, - *url); + auto telemetry = Telemetry::create(*final_cfg, tracer_signature, logger, + client, scheduler, *url); CHECK(scheduler->metrics_callback == nullptr); CHECK(scheduler->metrics_interval == nullopt); } @@ -1070,8 +1031,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, tracer_signature, logger, client, scheduler, - *url); + auto telemetry = Telemetry::create(*final_cfg, tracer_signature, logger, + client, scheduler, *url); CHECK(scheduler->metrics_callback != nullptr); CHECK(scheduler->metrics_interval == 500ms); @@ -1088,9 +1049,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, tracer_signature, logger, client, scheduler, - *url); - telemetry.log_error("error"); + auto telemetry = Telemetry::create(*final_cfg, tracer_signature, logger, + client, scheduler, *url); + telemetry->log_error("error"); // NOTE(@dmehala): logs are sent with an heartbeat. scheduler->trigger_heartbeat(); diff --git a/test/test_span.cpp b/test/test_span.cpp index 65a79c30c..4af524ed7 100644 --- a/test/test_span.cpp +++ b/test/test_span.cpp @@ -749,6 +749,11 @@ TEST_SPAN("injecting W3C tracestate header") { // If one of these test cases results in a local sampling decision, let it be // "drop." config.trace_sampler.sample_rate = 0.0; + // Disable telemetry so no real Curl HTTP client / background thread is + // created. Otherwise the telemetry "app_started" request to a (likely + // absent) agent fails and logs an error into the shared MockLogger, racing + // against the `error_count() == 0` assertion below. + config.telemetry.enabled = false; const auto logger = std::make_shared(); config.logger = logger; config.collector = std::make_shared();