diff --git a/BUILD.bazel b/BUILD.bazel index df17ac563..11dfc5c7b 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -39,6 +39,10 @@ cc_library( "src/datadog/msgpack.cpp", "src/datadog/msgpack.h", "src/datadog/null_logger.h", + "src/datadog/otel_process_ctx.cpp", + "src/datadog/otel_process_ctx.h", + "src/datadog/otel_process_ctx_guard.cpp", + "src/datadog/otel_process_ctx_guard.h", "src/datadog/parse_util.cpp", "src/datadog/parse_util.h", "src/datadog/platform_util.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8cc154f17..63a291040 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -194,6 +194,8 @@ target_sources(dd-trace-cpp-objects src/datadog/limiter.cpp src/datadog/logger.cpp src/datadog/msgpack.cpp + src/datadog/otel_process_ctx.cpp + src/datadog/otel_process_ctx_guard.cpp src/datadog/parse_util.cpp src/datadog/propagation_style.cpp src/datadog/random.cpp diff --git a/include/datadog/tracer.h b/include/datadog/tracer.h index 6a032a133..6f058dd6a 100644 --- a/include/datadog/tracer.h +++ b/include/datadog/tracer.h @@ -33,6 +33,7 @@ class TraceSampler; class SpanSampler; class IDGenerator; class InMemoryFile; +class OtelCtxGuard; class Tracer { std::shared_ptr logger_; @@ -51,6 +52,8 @@ class Tracer { // read to determine if the process is instrumented with a tracer and to // retrieve relevant tracing information. std::shared_ptr metadata_file_; + // Owns the published OpenTelemetry process context, if any. + std::unique_ptr otel_guard_; Baggage::Options baggage_opts_; bool baggage_injection_enabled_; bool baggage_extraction_enabled_; @@ -65,6 +68,16 @@ class Tracer { Tracer(const FinalizedTracerConfig& config, const std::shared_ptr& generator); + ~Tracer(); + + // Move-only. The otel context guarded by OtelCtxGuard is a process-wide + // singleton; duplicating ownership would lead to spurious drops, so copies + // are disallowed. + Tracer(Tracer&&) noexcept; + Tracer& operator=(Tracer&&) noexcept; + Tracer(const Tracer&) = delete; + Tracer& operator=(const Tracer&) = delete; + // Create a new trace and return the root span of the trace. Optionally // specify a `config` indicating the attributes of the root span. Span create_span(); diff --git a/include/datadog/version.h b/include/datadog/version.h index 90f4ffca5..96f9889eb 100644 --- a/include/datadog/version.h +++ b/include/datadog/version.h @@ -4,6 +4,9 @@ namespace datadog::tracing { +// This library's name +extern const char *const tracer_library_name; + // The release version at or before this code revision, e.g. "v0.1.12". // That is, this code is at least as recent as `tracer_version`, but may be // more recent. diff --git a/src/datadog/error.cpp b/src/datadog/error.cpp index 8aa8d2f7f..ba7ab6f46 100644 --- a/src/datadog/error.cpp +++ b/src/datadog/error.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -6,8 +7,8 @@ namespace datadog { namespace tracing { std::ostream& operator<<(std::ostream& stream, const Error& error) { - return stream << "[dd-trace-cpp error code " << int(error.code) << "] " - << error.message; + return stream << "[" << tracer_library_name << " error code " + << int(error.code) << "] " << error.message; } Error Error::with_prefix(StringView prefix) const { diff --git a/src/datadog/otel_process_ctx.cpp b/src/datadog/otel_process_ctx.cpp new file mode 100644 index 000000000..9056e11dc --- /dev/null +++ b/src/datadog/otel_process_ctx.cpp @@ -0,0 +1,898 @@ +// clang-format off +// This file is a verbatim copy of an upstream reference implementation; do +// not let clang-format rewrite it. + +// The upstream home for this file is +// https://github.com/open-telemetry/sig-profiling/blob/main/process-context/c-and-cpp/otel_process_ctx.c + +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache License (Version 2.0). +// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +#include "otel_process_ctx.h" + +#ifndef _GNU_SOURCE + #define _GNU_SOURCE +#endif + +#if defined(__GNUC__) && defined(__cplusplus) && __cplusplus < 202002L + // This file uses C99 compound literals with designated initializers + // throughout. They are standard in C (since C99) and in C++ (since C++20), + // but trigger -Wpedantic (older GCC) or -Wc++20-extensions (newer GCC) when + // compiled as C++17 or older. Silencing -Wpragmas first lets GCC versions + // that don't know -Wc++20-extensions accept the pragma silently. + #pragma GCC diagnostic ignored "-Wpragmas" + #pragma GCC diagnostic ignored "-Wpedantic" + #pragma GCC diagnostic ignored "-Wc++20-extensions" +#endif + +// Note: Things here are needed for NOOP. Things that are only for non-NOOP get added further below. + +#include + +#define ADD_QUOTES_HELPER(x) #x +#define ADD_QUOTES(x) ADD_QUOTES_HELPER(x) + +// Positional aggregate init (no designated initializers) so this file's NOOP +// path compiles on MSVC at /std:c++17. +static const otel_process_ctx_data empty_data = { + NULL, // deployment_environment_name + NULL, // service_instance_id + NULL, // service_name + NULL, // service_version + NULL, // telemetry_sdk_language + NULL, // telemetry_sdk_version + NULL, // telemetry_sdk_name + NULL, // resource_attributes + NULL, // extra_attributes + NULL // thread_ctx_config +}; + +#if (defined(OTEL_PROCESS_CTX_NOOP) && OTEL_PROCESS_CTX_NOOP) || !defined(__linux__) + // NOOP implementations when OTEL_PROCESS_CTX_NOOP is defined or not on Linux + + otel_process_ctx_result otel_process_ctx_publish(const otel_process_ctx_data *data) { + (void) data; // Suppress unused parameter warning + otel_process_ctx_result result = {false, "OTEL_PROCESS_CTX_NOOP mode is enabled - no-op implementation (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + return result; + } + + bool otel_process_ctx_drop_current(void) { + return true; // Nothing to do, this always succeeds + } + + #ifndef OTEL_PROCESS_CTX_NO_READ + otel_process_ctx_read_result otel_process_ctx_read(void) { + otel_process_ctx_read_result result = {false, "OTEL_PROCESS_CTX_NOOP mode is enabled - no-op implementation (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", empty_data}; + return result; + } + + bool otel_process_ctx_read_drop(otel_process_ctx_read_result *result) { + (void) result; // Suppress unused parameter warning + return false; + } + #endif // OTEL_PROCESS_CTX_NO_READ +#else // OTEL_PROCESS_CTX_NOOP + +#ifdef __cplusplus + #include + using std::atomic_thread_fence; + using std::memory_order_seq_cst; +#else + #include +#endif +#include +#include +#include +#include +#include +#include +#include + +#define KEY_VALUE_LIMIT 4096 +#define UINT14_MAX 16383 +#define OTEL_CTX_SIGNATURE "OTEL_CTX" + +#ifndef PR_SET_VMA + #define PR_SET_VMA 0x53564d41 + #define PR_SET_VMA_ANON_NAME 0 +#endif + +#ifndef MFD_NOEXEC_SEAL + #define MFD_NOEXEC_SEAL 8U +#endif + +/** + * The process context data that's written into the published anonymous mapping. + * + * An outside-of-process reader will read this struct + otel_process_payload to get the data. + */ +typedef struct __attribute__((packed, aligned(8))) { + char otel_process_ctx_signature[8]; // Always "OTEL_CTX" + uint32_t otel_process_ctx_version; // Always > 0, incremented when the data structure changes, currently v2 + uint32_t otel_process_payload_size; // Always > 0, size of storage + uint64_t otel_process_monotonic_published_at_ns; // Timestamp from when the context was published in nanoseconds from CLOCK_BOOTTIME. 0 during updates. + char *otel_process_payload; // Always non-null, points to the storage for the data; expected to be a protobuf map of string key/value pairs, null-terminated +} otel_process_ctx_mapping; + +/** + * The full state of a published process context. + * + * It is used to store the all data for the process context and that needs to be kept around while the context is published. + */ +typedef struct { + // The pid of the process that published the context. + pid_t publisher_pid; + // The actual mapping of the process context. Note that because we `madvise(..., MADV_DONTFORK)` this mapping is not + // propagated to child processes and thus `mapping` is only valid on the process that published the context. + otel_process_ctx_mapping *mapping; + // The process context payload. + char *payload; +} otel_process_ctx_state; + +/** + * Only one context is active, so we keep its state as a global. + */ +static otel_process_ctx_state published_state; + +static otel_process_ctx_result otel_process_ctx_update(uint64_t monotonic_published_at_ns, const otel_process_ctx_data *data); +static otel_process_ctx_result otel_process_ctx_encode_protobuf_payload(char **out, uint32_t *out_size, otel_process_ctx_data data); + +static uint64_t monotonic_time_now_ns(void) { + struct timespec ts; + if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) return 0; + return ts.tv_sec * 1000000000ULL + ts.tv_nsec; +} + +static bool ctx_is_published(otel_process_ctx_state state) { + return state.mapping != NULL && state.mapping != MAP_FAILED && getpid() == state.publisher_pid; +} + +// The process context is designed to be read by an outside-of-process reader. Thus, for concurrency purposes the steps +// on this method are ordered in a way to avoid races, or if not possible to avoid, to allow the reader to detect if there was a race. +otel_process_ctx_result otel_process_ctx_publish(const otel_process_ctx_data *data) { + if (!data) return (otel_process_ctx_result) {.success = false, .error_message = "otel_process_ctx_data is NULL (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + + uint64_t monotonic_published_at_ns = monotonic_time_now_ns(); + if (monotonic_published_at_ns == 0) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to get current time (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + // Step: If the context has been published by this process, update it in place + if (ctx_is_published(published_state)) return otel_process_ctx_update(monotonic_published_at_ns, data); + + // Step: Drop any previous context state if it exists + // No state should be around anywhere after this step. + if (!otel_process_ctx_drop_current()) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop previous context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + // Step: Prepare the payload to be published + // The payload SHOULD be ready and valid before trying to actually create the mapping. + uint32_t payload_size = 0; + otel_process_ctx_result result = otel_process_ctx_encode_protobuf_payload(&published_state.payload, &payload_size, *data); + if (!result.success) return result; + + // Step: Create the mapping + const ssize_t mapping_size = sizeof(otel_process_ctx_mapping); + published_state.publisher_pid = getpid(); // This allows us to detect in forks that we shouldn't touch the mapping + int fd = memfd_create("OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING | MFD_NOEXEC_SEAL); + if (fd < 0) { + // MFD_NOEXEC_SEAL is a newer flag; older kernels reject unknown flags, so let's retry without it + fd = memfd_create("OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING); + } + bool failed_to_close_fd = false; + if (fd >= 0) { + // Try to create mapping from memfd + if (ftruncate(fd, mapping_size) == -1) { + close(fd); // Swallow errors here, truncation already failed anyway + otel_process_ctx_drop_current(); + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to truncate memfd (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + published_state.mapping = (otel_process_ctx_mapping *) mmap(NULL, mapping_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0); + failed_to_close_fd = (close(fd) == -1); + } else { + // Fallback: Use an anonymous mapping instead + published_state.mapping = (otel_process_ctx_mapping *) mmap(NULL, mapping_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + } + if (published_state.mapping == MAP_FAILED || failed_to_close_fd) { + otel_process_ctx_drop_current(); + + if (failed_to_close_fd) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to close memfd (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } else { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to allocate mapping (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + } + + // Step: Setup MADV_DONTFORK + // This ensures that the mapping is not propagated to child processes (they should call update/publish again). + if (madvise(published_state.mapping, mapping_size, MADV_DONTFORK) == -1) { + if (otel_process_ctx_drop_current()) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to setup MADV_DONTFORK (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } else { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + } + + // Step: Populate the mapping + // The payload and any extra fields must come first and not be reordered with the monotonic_published_at_ns by the compiler. + *published_state.mapping = (otel_process_ctx_mapping) { + .otel_process_ctx_signature = { 'O', 'T', 'E', 'L', '_', 'C', 'T', 'X' }, + .otel_process_ctx_version = 2, + .otel_process_payload_size = payload_size, + .otel_process_monotonic_published_at_ns = 0, // Set in "Step: Populate the monotonic_published_at_ns into the mapping" below + .otel_process_payload = published_state.payload + }; + + // Step: Synchronization - Mapping has been filled and is missing monotonic_published_at_ns + // Make sure the initialization of the mapping + payload above does not get reordered with setting the monotonic_published_at_ns below. Setting + // the monotonic_published_at_ns is what tells an outside reader that the context is fully published. + atomic_thread_fence(memory_order_seq_cst); + + // Step: Populate the monotonic_published_at_ns into the mapping + // The monotonic_published_at_ns must come last and not be reordered with the fields above by the compiler. After this step, external readers + // can read the monotonic_published_at_ns and know that the payload is ready to be read. + published_state.mapping->otel_process_monotonic_published_at_ns = monotonic_published_at_ns; + + // Step: Attempt to name the mapping so outside readers can: + // * Find it by name + // * Hook on prctl to detect when new mappings are published + if (prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, published_state.mapping, mapping_size, OTEL_CTX_SIGNATURE) == -1) { + // Naming an anonymous mapping is an optional Linux 5.17+ feature (`CONFIG_ANON_VMA_NAME`). + // Many distros, such as Ubuntu and Arch enable it. On earlier kernel versions or kernels without the feature, this call can fail. + // + // It's OK for this to fail because (per-usecase): + // 1. "Find it by name" => As a fallback, it's possible to scan the mappings and for the memfd name. + // 2. "Hook on prctl" => When hooking on prctl via eBPF it's still possible to see this call, even when it's not supported/enabled. + // This works even on older kernels! For this reason we unconditionally make this call even on older kernels -- to + // still allow detection via hooking onto prctl. + } + + // All done! + + return (otel_process_ctx_result) {.success = true, .error_message = NULL}; +} + +bool otel_process_ctx_drop_current(void) { + otel_process_ctx_state state = published_state; + + // Zero out the state and make sure no operations below are reordered with zeroing + published_state = (otel_process_ctx_state) {.publisher_pid = 0, .mapping = NULL, .payload = NULL}; + atomic_thread_fence(memory_order_seq_cst); + + bool success = true; + + // The mapping only exists if it was created by the current process; if it was inherited by a fork it doesn't exist anymore + // (due to the MADV_DONTFORK) and we don't need to do anything to it. + if (ctx_is_published(state)) { + success = munmap(state.mapping, sizeof(otel_process_ctx_mapping)) == 0; + } + + // The payload may have been inherited from a parent. This is a regular malloc so we need to free it so we don't leak. + free(state.payload); + + return success; +} + +static otel_process_ctx_result otel_process_ctx_update(uint64_t monotonic_published_at_ns, const otel_process_ctx_data *data) { + if (data == NULL || !ctx_is_published(published_state)) { + return (otel_process_ctx_result) {.success = false, .error_message = "Unexpected: otel_process_ctx_data is NULL or context is not published (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + if (monotonic_published_at_ns == published_state.mapping->otel_process_monotonic_published_at_ns) { + // Advance published_at_ns to allow readers to detect the update + monotonic_published_at_ns++; + } + + // Step: Prepare the new payload to be published + // The payload SHOULD be ready and valid before trying to actually update the mapping. + uint32_t payload_size = 0; + char *payload; + otel_process_ctx_result result = otel_process_ctx_encode_protobuf_payload(&payload, &payload_size, *data); + if (!result.success) return result; + + // Step: Zero out monotonic_published_at_ns in the mapping + // This enables readers to detect that an update is in-progress + published_state.mapping->otel_process_monotonic_published_at_ns = 0; + + // Step: Synchronization - Make sure readers observe the zeroing above before anything else below + atomic_thread_fence(memory_order_seq_cst); + + // Step: Install updated data + published_state.mapping->otel_process_payload_size = payload_size; + published_state.mapping->otel_process_payload = payload; + + // Step: Synchronization - Make sure readers observe the updated data before anything else below + atomic_thread_fence(memory_order_seq_cst); + + // Step: Install new monotonic_published_at_ns + // The update is now complete -- readers that observe the new timestamp will observe the updated payload + published_state.mapping->otel_process_monotonic_published_at_ns = monotonic_published_at_ns; + + // Step: Attempt to name the mapping so outside readers can detect the update + if (prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, published_state.mapping, sizeof(otel_process_ctx_mapping), OTEL_CTX_SIGNATURE) == -1) { + // It's OK for this to fail -- see otel_process_ctx_publish for why + } + + // Step: Update bookkeeping + free(published_state.payload); // This was still pointing to the old payload + published_state.payload = payload; + + // All done! + + return (otel_process_ctx_result) {.success = true, .error_message = NULL}; +} + +// The caller is responsible for enforcing that value fits within UINT14_MAX +static size_t protobuf_varint_size(uint16_t value) { return value >= 128 ? 2 : 1; } + +// Field tag for record + varint len + data +static size_t protobuf_record_size(size_t len) { return 1 + protobuf_varint_size(len) + len; } + +static size_t protobuf_string_size(const char *str) { return protobuf_record_size(strlen(str)); } + +static size_t protobuf_otel_keyvalue_string_size(const char *key, const char *value) { + size_t key_field_size = protobuf_string_size(key); // String + size_t value_field_size = protobuf_record_size(protobuf_string_size(value)); // Nested AnyValue message with a string inside + return key_field_size + value_field_size; // Does not include the keyvalue record tag + size, only its payload +} + +static size_t protobuf_otel_array_value_content_size(const char **strings) { + size_t total = 0; + for (size_t i = 0; strings[i] != NULL; i++) { + total += protobuf_record_size(protobuf_string_size(strings[i])); // ArrayValue.values[i]: AnyValue{string_value} + } + return total; +} + +// As a simplification, we enforce that keys and values are <= 4096 (KEY_VALUE_LIMIT) so that their size + extra bytes always fits within UINT14_MAX +static otel_process_ctx_result validate_and_calculate_protobuf_payload_size(size_t *out_pairs_size, const char **pairs) { + size_t num_entries = 0; + for (size_t i = 0; pairs[i] != NULL; i++) num_entries++; + if (num_entries % 2 != 0) { + return (otel_process_ctx_result) {.success = false, .error_message = "Value in otel_process_ctx_data is NULL (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + *out_pairs_size = 0; + for (size_t i = 0; pairs[i * 2] != NULL; i++) { + const char *key = pairs[i * 2]; + const char *value = pairs[i * 2 + 1]; + + if (strlen(key) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of key in otel_process_ctx_data exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + if (strlen(value) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of value in otel_process_ctx_data exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + *out_pairs_size += protobuf_record_size(protobuf_otel_keyvalue_string_size(key, value)); // KeyValue message + } + return (otel_process_ctx_result) {.success = true, .error_message = NULL}; +} + +/** + * Writes a protobuf varint encoding for the given value. + * As a simplification, only supports values that fit in 1 or 2 bytes (0-16383 UINT14_MAX). + */ +static void write_protobuf_varint(char **ptr, uint16_t value) { + if (protobuf_varint_size(value) == 1) { + *(*ptr)++ = (char)value; + } else { + // Two bytes: first byte has MSB set, second byte has value + *(*ptr)++ = (char)((value & 0x7F) | 0x80); // Low 7 bits + continuation bit + *(*ptr)++ = (char)(value >> 7); // High 7 bits + } +} + +static void write_protobuf_string(char **ptr, const char *str) { + size_t len = strlen(str); + write_protobuf_varint(ptr, len); + memcpy(*ptr, str, len); + *ptr += len; +} + +static void write_protobuf_tag(char **ptr, uint8_t field_number) { + *(*ptr)++ = (char)((field_number << 3) | 2); // Field type is always 2 (LEN) +} + +static void write_attribute(char **ptr, uint8_t field_number, const char *key, const char *value) { + write_protobuf_tag(ptr, field_number); + write_protobuf_varint(ptr, protobuf_otel_keyvalue_string_size(key, value)); + + // KeyValue + write_protobuf_tag(ptr, 1); // KeyValue.key (field 1) + write_protobuf_string(ptr, key); + write_protobuf_tag(ptr, 2); // KeyValue.value (field 2) + write_protobuf_varint(ptr, protobuf_string_size(value)); + + // AnyValue + write_protobuf_tag(ptr, 1); // AnyValue.string_value (field 1) + write_protobuf_string(ptr, value); +} + +static void write_array_attribute(char **ptr, uint8_t field_number, const char *key, const char **strings) { + size_t array_value_content_size = protobuf_otel_array_value_content_size(strings); + size_t any_value_content_size = protobuf_record_size(array_value_content_size); + size_t kv_content_size = protobuf_string_size(key) + protobuf_record_size(any_value_content_size); + + write_protobuf_tag(ptr, field_number); + write_protobuf_varint(ptr, kv_content_size); + + write_protobuf_tag(ptr, 1); // KeyValue.key (field 1) + write_protobuf_string(ptr, key); + + write_protobuf_tag(ptr, 2); // KeyValue.value (field 2) = AnyValue message + write_protobuf_varint(ptr, any_value_content_size); + + write_protobuf_tag(ptr, 5); // AnyValue.array_value (field 5) = ArrayValue message + write_protobuf_varint(ptr, array_value_content_size); + + for (size_t i = 0; strings[i] != NULL; i++) { // ArrayValue.values (field 1) - repeated AnyValue entries + write_protobuf_tag(ptr, 1); // ArrayValue.values[i] + write_protobuf_varint(ptr, protobuf_string_size(strings[i])); // Inner AnyValue size + write_protobuf_tag(ptr, 1); // AnyValue.string_value (field 1) + write_protobuf_string(ptr, strings[i]); + } +} + +// Encode the payload as protobuf bytes. +// +// This method implements an extremely compact but limited protobuf encoder for the ProcessContext message. +// It encodes all fields as Resource attributes (KeyValue pairs). +// For extra compact code, it fixes strings at up to 4096 bytes. +static otel_process_ctx_result otel_process_ctx_encode_protobuf_payload(char **out, uint32_t *out_size, otel_process_ctx_data data) { + const char *pairs[] = { + "deployment.environment.name", data.deployment_environment_name, + "service.instance.id", data.service_instance_id, + "service.name", data.service_name, + "service.version", data.service_version, + "telemetry.sdk.language", data.telemetry_sdk_language, + "telemetry.sdk.version", data.telemetry_sdk_version, + "telemetry.sdk.name", data.telemetry_sdk_name, + NULL + }; + + size_t pairs_size = 0; + otel_process_ctx_result validation_result = validate_and_calculate_protobuf_payload_size(&pairs_size, (const char **) pairs); + if (!validation_result.success) return validation_result; + + size_t resource_attributes_pairs_size = 0; + if (data.resource_attributes != NULL) { + validation_result = validate_and_calculate_protobuf_payload_size(&resource_attributes_pairs_size, data.resource_attributes); + if (!validation_result.success) return validation_result; + } + + size_t extra_attributes_pairs_size = 0; + if (data.extra_attributes != NULL) { + validation_result = validate_and_calculate_protobuf_payload_size(&extra_attributes_pairs_size, data.extra_attributes); + if (!validation_result.success) return validation_result; + } + + size_t thread_ctx_pairs_size = 0; + if (data.thread_ctx_config != NULL) { + if (data.thread_ctx_config->schema_version != NULL) { + const char *thread_ctx_pairs[] = {"threadlocal.schema_version", data.thread_ctx_config->schema_version, NULL}; + validation_result = validate_and_calculate_protobuf_payload_size(&thread_ctx_pairs_size, thread_ctx_pairs); + if (!validation_result.success) return validation_result; + } + if (data.thread_ctx_config->attribute_key_map != NULL) { + if (data.thread_ctx_config->schema_version == NULL) { + return (otel_process_ctx_result) {.success = false, .error_message = "attribute_key_map requires schema_version to be set (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + for (size_t i = 0; data.thread_ctx_config->attribute_key_map[i] != NULL; i++) { + if (strlen(data.thread_ctx_config->attribute_key_map[i]) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of attribute_key_map entry exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + } + size_t array_value_content_size = protobuf_otel_array_value_content_size(data.thread_ctx_config->attribute_key_map); + size_t any_value_content_size = protobuf_record_size(array_value_content_size); + size_t kv_content_size = protobuf_string_size("threadlocal.attribute_key_map") + protobuf_record_size(any_value_content_size); + if (kv_content_size > UINT14_MAX) { + return (otel_process_ctx_result) {.success = false, .error_message = "Encoded size of attribute_key_map exceeds UINT14_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + thread_ctx_pairs_size += protobuf_record_size(kv_content_size); + } + } + + size_t resource_size = pairs_size + resource_attributes_pairs_size; + if (resource_size > UINT14_MAX) { + return (otel_process_ctx_result) {.success = false, .error_message = "Encoded size of resource attributes exceeds UINT14_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + size_t total_size = protobuf_record_size(resource_size) + extra_attributes_pairs_size + thread_ctx_pairs_size; + + char *encoded = (char *) calloc(total_size, 1); + if (!encoded) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to allocate memory for payload (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + char *ptr = encoded; + + // ProcessContext.resource (field 1) + write_protobuf_tag(&ptr, 1); + write_protobuf_varint(&ptr, resource_size); + + for (size_t i = 0; pairs[i * 2] != NULL; i++) { + write_attribute(&ptr, 1, pairs[i * 2], pairs[i * 2 + 1]); + } + + for (size_t i = 0; data.resource_attributes != NULL && data.resource_attributes[i * 2] != NULL; i++) { + write_attribute(&ptr, 1, data.resource_attributes[i * 2], data.resource_attributes[i * 2 + 1]); + } + + // ProcessContext.extra_attributes (field 2) + for (size_t i = 0; data.extra_attributes != NULL && data.extra_attributes[i * 2] != NULL; i++) { + write_attribute(&ptr, 2, data.extra_attributes[i * 2], data.extra_attributes[i * 2 + 1]); + } + + if (data.thread_ctx_config != NULL) { + if (data.thread_ctx_config->schema_version != NULL) { + write_attribute(&ptr, 2, "threadlocal.schema_version", data.thread_ctx_config->schema_version); + } + if (data.thread_ctx_config->attribute_key_map != NULL) { + write_array_attribute(&ptr, 2, "threadlocal.attribute_key_map", data.thread_ctx_config->attribute_key_map); + } + } + + *out = encoded; + *out_size = (uint32_t) total_size; + + return (otel_process_ctx_result) {.success = true, .error_message = NULL}; +} + +#ifndef OTEL_PROCESS_CTX_NO_READ + #include + #include + #include + #include + + // Note: The below parsing code is only for otel_process_ctx_read and is only provided for debugging + // and testing purposes. + + static void *parse_mapping_start(char *line) { + char *endptr = NULL; + unsigned long long start = strtoull(line, &endptr, 16); + if (start == 0 || start == ULLONG_MAX) return NULL; + return (void *)(uintptr_t) start; + } + + static otel_process_ctx_mapping *try_finding_mapping(void) { + char line[8192]; + otel_process_ctx_mapping *result = NULL; + + FILE *fp = fopen("/proc/self/maps", "r"); + if (!fp) return result; + + while (fgets(line, sizeof(line), fp)) { + bool is_process_ctx = strstr(line, "[anon_shmem:OTEL_CTX]") != NULL || strstr(line, "[anon:OTEL_CTX]") != NULL || strstr(line, "/memfd:OTEL_CTX") != NULL; + if (is_process_ctx) { + result = (otel_process_ctx_mapping *)parse_mapping_start(line); + break; + } + } + + fclose(fp); + return result; + } + + // Helper function to read a protobuf varint (limited to 1-2 bytes, max value UINT14_MAX, matching write_protobuf_varint above) + static bool read_protobuf_varint(char **ptr, char *end_ptr, uint16_t *value) { + if (*ptr >= end_ptr) return false; + + unsigned char first_byte = (unsigned char)**ptr; + (*ptr)++; + + if (first_byte < 128) { + *value = first_byte; + return true; + } else { + if (*ptr >= end_ptr) return false; + unsigned char second_byte = (unsigned char)**ptr; + (*ptr)++; + + *value = (first_byte & 0x7F) | (second_byte << 7); + return *value <= UINT14_MAX; + } + } + + // Helper function to read a protobuf string into a buffer, within the same limits as the encoder imposes + static bool read_protobuf_string(char **ptr, char *end_ptr, char *buffer) { + uint16_t len; + if (!read_protobuf_varint(ptr, end_ptr, &len) || len >= KEY_VALUE_LIMIT + 1 || *ptr + len > end_ptr) return false; + + memcpy(buffer, *ptr, len); + buffer[len] = '\0'; + *ptr += len; + + return true; + } + + // Reads field name and validates the fixed LEN wire type + static bool read_protobuf_tag(char **ptr, char *end_ptr, uint8_t *field_number) { + if (*ptr >= end_ptr) return false; + + unsigned char tag = (unsigned char)**ptr; + (*ptr)++; + + uint8_t wire_type = tag & 0x07; + *field_number = tag >> 3; + + return wire_type == 2; // We only need the LEN wire type for now + } + + // Peeks at the key of an OTel KeyValue message without advancing the pointer. + static bool peek_protobuf_key(char *ptr, char *end_ptr, char *key_buffer) { + char *p = ptr; + uint8_t kv_field; + if (!read_protobuf_tag(&p, end_ptr, &kv_field)) return false; + if (kv_field != 1) return false; // KeyValue.key is field 1 + return read_protobuf_string(&p, end_ptr, key_buffer); + } + + // Reads an OTel KeyValue message (key string + AnyValue-wrapped string) into the provided buffers. + static bool read_protobuf_keyvalue(char **ptr, char *end_ptr, char *key_buffer, char *value_buffer) { + bool key_found = false; + bool value_found = false; + + while (*ptr < end_ptr) { + uint8_t kv_field; + if (!read_protobuf_tag(ptr, end_ptr, &kv_field)) return false; + + if (kv_field == 1) { // KeyValue.key + if (!read_protobuf_string(ptr, end_ptr, key_buffer)) return false; + key_found = true; + } else if (kv_field == 2) { // KeyValue.value (AnyValue) + uint16_t _any_len; // Unused, but we still need to consume + validate the varint + if (!read_protobuf_varint(ptr, end_ptr, &_any_len)) return false; + uint8_t any_field; + if (!read_protobuf_tag(ptr, end_ptr, &any_field)) return false; + + if (any_field == 1) { // AnyValue.string_value + if (!read_protobuf_string(ptr, end_ptr, value_buffer)) return false; + value_found = true; + } + } + } + + return key_found && value_found; + } + + // Reads an AnyValue.array_value (field 5) from ptr; ptr must be at KeyValue.value (tag 2). + // Allocates a NULL-terminated array of strings and sets *out_array immediately. On error the caller must free it. + static bool read_protobuf_array_value_strings(char **ptr, char *end_ptr, char *value_buffer, const char ***out_array) { + uint8_t field; + if (!read_protobuf_tag(ptr, end_ptr, &field) || field != 2) return false; + uint16_t any_len; + if (!read_protobuf_varint(ptr, end_ptr, &any_len)) return false; + char *any_end = *ptr + any_len; + if (any_end > end_ptr) return false; + + if (!read_protobuf_tag(ptr, any_end, &field) || field != 5) return false; + uint16_t array_len; + if (!read_protobuf_varint(ptr, any_end, &array_len)) return false; + char *array_end = *ptr + array_len; + if (array_end > any_end) return false; + + size_t max = 100; + size_t capacity = max + 1; + const char **arr = (const char **) calloc(capacity, sizeof(char *)); + if (!arr) return false; + *out_array = arr; + size_t count = 0; + + while (*ptr < array_end) { + if (count >= max) return false; + if (!read_protobuf_tag(ptr, array_end, &field) || field != 1) return false; + uint16_t item_len; + if (!read_protobuf_varint(ptr, array_end, &item_len)) return false; + char *item_end = *ptr + item_len; + if (item_end > array_end) return false; + if (!read_protobuf_tag(ptr, item_end, &field) || field != 1) return false; + if (!read_protobuf_string(ptr, item_end, value_buffer)) return false; + char *dup = strdup(value_buffer); + if (!dup) return false; + arr[count++] = dup; + } + + return true; + } + + static bool ensure_thread_ctx_config(otel_process_ctx_data *data_out) { + if (data_out->thread_ctx_config) return true; + otel_thread_ctx_config_data *setup = (otel_thread_ctx_config_data *) calloc(1, sizeof(otel_thread_ctx_config_data)); + if (!setup) return false; + data_out->thread_ctx_config = setup; + return true; + } + + // Simplified protobuf decoder to match the exact encoder above. If the protobuf data doesn't match the encoder, this will + // return false. + static bool otel_process_ctx_decode_payload(char *payload, uint32_t payload_size, otel_process_ctx_data *data_out, char *key_buffer, char *value_buffer) { + char *ptr = payload; + char *end_ptr = payload + payload_size; + + *data_out = empty_data; + + // Parse ProcessContext wrapper - expect field 1 (resource) + uint8_t process_ctx_field; + if (!read_protobuf_tag(&ptr, end_ptr, &process_ctx_field) || process_ctx_field != 1) return false; + + uint16_t resource_len; + if (!read_protobuf_varint(&ptr, end_ptr, &resource_len)) return false; + char *resource_end = ptr + resource_len; + if (resource_end > end_ptr) return false; + + size_t resource_index = 0; + size_t resource_capacity = 201; // Allocate space for 100 pairs + NULL terminator entry + data_out->resource_attributes = (const char **) calloc(resource_capacity, sizeof(char *)); + if (data_out->resource_attributes == NULL) return false; + + size_t extra_attributes_index = 0; + size_t extra_attributes_capacity = 201; // Allocate space for 100 pairs + NULL terminator entry + data_out->extra_attributes = (const char **) calloc(extra_attributes_capacity, sizeof(char *)); + if (data_out->extra_attributes == NULL) return false; + + // Parse resource attributes (field 1) + while (ptr < resource_end) { + uint8_t field_number; + if (!read_protobuf_tag(&ptr, resource_end, &field_number) || field_number != 1) return false; + + uint16_t kv_len; + if (!read_protobuf_varint(&ptr, resource_end, &kv_len)) return false; + char *kv_end = ptr + kv_len; + if (kv_end > resource_end) return false; + + if (!read_protobuf_keyvalue(&ptr, kv_end, key_buffer, value_buffer)) return false; + + char *value = strdup(value_buffer); + if (!value) return false; + + // Dispatch based on key + const char **field = NULL; + if (strcmp(key_buffer, "deployment.environment.name") == 0) { field = &data_out->deployment_environment_name; } + else if (strcmp(key_buffer, "service.instance.id") == 0) { field = &data_out->service_instance_id; } + else if (strcmp(key_buffer, "service.name") == 0) { field = &data_out->service_name; } + else if (strcmp(key_buffer, "service.version") == 0) { field = &data_out->service_version; } + else if (strcmp(key_buffer, "telemetry.sdk.language") == 0) { field = &data_out->telemetry_sdk_language; } + else if (strcmp(key_buffer, "telemetry.sdk.version") == 0) { field = &data_out->telemetry_sdk_version; } + else if (strcmp(key_buffer, "telemetry.sdk.name") == 0) { field = &data_out->telemetry_sdk_name; } + + if (field != NULL) { + if (*field != NULL) { free(value); return false; } + *field = value; + } else { + char *key = strdup(key_buffer); + + if (!key || resource_index + 2 >= resource_capacity) { + free(key); + free(value); + return false; + } + data_out->resource_attributes[resource_index] = key; + data_out->resource_attributes[resource_index + 1] = value; + resource_index += 2; + } + } + + // Parse extra attributes (field 2) + while (ptr < end_ptr) { + uint8_t extra_ctx_field; + if (!read_protobuf_tag(&ptr, end_ptr, &extra_ctx_field) || extra_ctx_field != 2) return false; + + uint16_t kv_len; + if (!read_protobuf_varint(&ptr, end_ptr, &kv_len)) return false; + char *kv_end = ptr + kv_len; + if (kv_end > end_ptr) return false; + + if (!peek_protobuf_key(ptr, kv_end, key_buffer)) return false; + + if (strcmp(key_buffer, "threadlocal.attribute_key_map") == 0) { + // Consume key to advance ptr + uint8_t kv_field; + if (!read_protobuf_tag(&ptr, kv_end, &kv_field) || kv_field != 1) return false; + if (!read_protobuf_string(&ptr, kv_end, key_buffer)) return false; + if (!ensure_thread_ctx_config(data_out)) return false; + if (data_out->thread_ctx_config->attribute_key_map != NULL) return false; + if (!read_protobuf_array_value_strings(&ptr, kv_end, value_buffer, &((otel_thread_ctx_config_data *)data_out->thread_ctx_config)->attribute_key_map)) return false; + } else { + if (!read_protobuf_keyvalue(&ptr, kv_end, key_buffer, value_buffer)) return false; + + char *value = strdup(value_buffer); + if (!value) return false; + + // Dispatch based on key + if (strcmp(key_buffer, "threadlocal.schema_version") == 0) { + if (!ensure_thread_ctx_config(data_out)) { free(value); return false; } + if (data_out->thread_ctx_config->schema_version != NULL) { free(value); return false; } + ((otel_thread_ctx_config_data *)data_out->thread_ctx_config)->schema_version = value; + } else { + char *key = strdup(key_buffer); + if (!key || extra_attributes_index + 2 >= extra_attributes_capacity) { + free(key); + free(value); + return false; + } + data_out->extra_attributes[extra_attributes_index] = key; + data_out->extra_attributes[extra_attributes_index + 1] = value; + extra_attributes_index += 2; + } + } + } + + // Validate all required fields were found + return data_out->deployment_environment_name != NULL && + data_out->service_instance_id != NULL && + data_out->service_name != NULL && + data_out->service_version != NULL && + data_out->telemetry_sdk_language != NULL && + data_out->telemetry_sdk_version != NULL && + data_out->telemetry_sdk_name != NULL; + } + + static void otel_process_ctx_read_data_drop(otel_process_ctx_data data) { + if (data.deployment_environment_name) free((void *)data.deployment_environment_name); + if (data.service_instance_id) free((void *)data.service_instance_id); + if (data.service_name) free((void *)data.service_name); + if (data.service_version) free((void *)data.service_version); + if (data.telemetry_sdk_language) free((void *)data.telemetry_sdk_language); + if (data.telemetry_sdk_version) free((void *)data.telemetry_sdk_version); + if (data.telemetry_sdk_name) free((void *)data.telemetry_sdk_name); + if (data.resource_attributes) { + for (int i = 0; data.resource_attributes[i] != NULL; i++) free((void *)data.resource_attributes[i]); + free((void *)data.resource_attributes); + } + if (data.extra_attributes) { + for (int i = 0; data.extra_attributes[i] != NULL; i++) free((void *)data.extra_attributes[i]); + free((void *)data.extra_attributes); + } + if (data.thread_ctx_config) { + if (data.thread_ctx_config->schema_version) free((void *)data.thread_ctx_config->schema_version); + if (data.thread_ctx_config->attribute_key_map) { + for (int i = 0; data.thread_ctx_config->attribute_key_map[i] != NULL; i++) { + free((void *)data.thread_ctx_config->attribute_key_map[i]); + } + free((void *)data.thread_ctx_config->attribute_key_map); + } + free((void *)data.thread_ctx_config); + } + } + + otel_process_ctx_read_result otel_process_ctx_read(void) { + otel_process_ctx_mapping *mapping = try_finding_mapping(); + if (!mapping) { + return (otel_process_ctx_read_result) {.success = false, .error_message = "No OTEL_CTX mapping found (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; + } + + if (strncmp(mapping->otel_process_ctx_signature, OTEL_CTX_SIGNATURE, sizeof(mapping->otel_process_ctx_signature)) != 0 || mapping->otel_process_ctx_version != 2) { + return (otel_process_ctx_read_result) {.success = false, .error_message = "Invalid OTEL_CTX signature or version (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; + } + + otel_process_ctx_data data = empty_data; + + char *key_buffer = (char *) calloc(KEY_VALUE_LIMIT + 1, 1); + char *value_buffer = (char *) calloc(KEY_VALUE_LIMIT + 1, 1); + if (!key_buffer || !value_buffer) { + free(key_buffer); + free(value_buffer); + return (otel_process_ctx_read_result) {.success = false, .error_message = "Failed to allocate decode buffers (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; + } + + bool success = otel_process_ctx_decode_payload(mapping->otel_process_payload, mapping->otel_process_payload_size, &data, key_buffer, value_buffer); + free(key_buffer); + free(value_buffer); + + if (!success) { + otel_process_ctx_read_data_drop(data); + return (otel_process_ctx_read_result) {.success = false, .error_message = "Failed to decode payload (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; + } + + return (otel_process_ctx_read_result) {.success = true, .error_message = NULL, .data = data}; + } + + bool otel_process_ctx_read_drop(otel_process_ctx_read_result *result) { + if (!result || !result->success) return false; + otel_process_ctx_read_data_drop(result->data); + *result = (otel_process_ctx_read_result) {.success = false, .error_message = "Data dropped", .data = empty_data}; + return true; + } +#endif // OTEL_PROCESS_CTX_NO_READ + +#endif // OTEL_PROCESS_CTX_NOOP diff --git a/src/datadog/otel_process_ctx.h b/src/datadog/otel_process_ctx.h new file mode 100644 index 000000000..a1ebec6b6 --- /dev/null +++ b/src/datadog/otel_process_ctx.h @@ -0,0 +1,160 @@ +// clang-format off +// This file is a verbatim copy of an upstream reference implementation; do +// not let clang-format rewrite it. + +// The upstream home for this file is +// https://github.com/open-telemetry/sig-profiling/blob/main/process-context/c-and-cpp/otel_process_ctx.h + +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache License (Version 2.0). +// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +#pragma once + +#define OTEL_PROCESS_CTX_VERSION_MAJOR 1 +#define OTEL_PROCESS_CTX_VERSION_MINOR 0 +#define OTEL_PROCESS_CTX_VERSION_PATCH 0 +#define OTEL_PROCESS_CTX_VERSION_STRING "1.0.0" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * # OpenTelemetry Process Context reference implementation + * + * `otel_process_ctx.h` and `otel_process_ctx.c` provide a reference implementation for the OpenTelemetry + * process-level context sharing specification. + * (https://github.com/open-telemetry/opentelemetry-specification/blob/main/oteps/profiles/4719-process-ctx.md) + * + * This reference implementation is Linux-only, as the specification currently only covers Linux. + * On non-Linux OS's (or when OTEL_PROCESS_CTX_NOOP is defined) no-op versions of functions are supplied. + */ + + /** + * Config for the experimental thread context sharing mechanism, see + * https://github.com/open-telemetry/opentelemetry-specification/pull/4947 for usage + * details. + */ +typedef struct { + const char *schema_version; + // NULL-terminated array of attribute key strings to be used in thread context. + // Can be NULL if not needed. + const char **attribute_key_map; +} otel_thread_ctx_config_data; + +/** + * Data that can be published as a process context. + * + * Every string MUST be valid for the duration of the call to `otel_process_ctx_publish`. + * Strings will be copied into the context. + * + * Strings MUST be: + * * Non-NULL + * * UTF-8 encoded + * * Not longer than INT16_MAX bytes + * + * Strings MAY be: + * * Empty + */ +typedef struct { + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/deployment/#deployment-environment-name + const char *deployment_environment_name; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-instance-id + const char *service_instance_id; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-name + const char *service_name; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-version + const char *service_version; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-language + const char *telemetry_sdk_language; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-version + const char *telemetry_sdk_version; + // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-name + const char *telemetry_sdk_name; + // Additional key/value pairs as resource attributes https://opentelemetry.io/docs/specs/otel/resource/sdk/ + // Can be NULL if no resource attributes are needed; if non-NULL, this array MUST be terminated with a NULL entry. + // Every even entry is a key, every odd entry is a value (E.g. "key1", "value1", "key2", "value2", NULL). + const char **resource_attributes; + // Additional key/value pairs as extra attributes (ProcessContext.extra_attributes in process_context.proto) + // Can be NULL if no extra attributes are needed; if non-NULL, this array MUST be terminated with a NULL entry. + // Every even entry is a key, every odd entry is a value (E.g. "key1", "value1", "key2", "value2", NULL). + const char **extra_attributes; + // Experimental thread context sharing mechanism configuration. See struct definition for details. Can be NULL. + const otel_thread_ctx_config_data *thread_ctx_config; +} otel_process_ctx_data; + +/** Number of entries in the `otel_process_ctx_data` struct. Can be used to easily detect when the struct is updated. */ +#define OTEL_PROCESS_CTX_DATA_ENTRIES sizeof(otel_process_ctx_data) / sizeof(char *) + +typedef struct { + bool success; + const char *error_message; // Static strings only, non-NULL if success is false +} otel_process_ctx_result; + +/** + * Publishes a OpenTelemetry process context with the given data. + * + * The context should remain alive until the application exits (or is just about to exit). + * This method is NOT thread-safe. + * + * Calling `publish` multiple times is supported and will replace a previous context (only one is published at any given + * time). Calling `publish` multiple times usually happens when: + * * Some of the `otel_process_ctx_data` changes due to a live system reconfiguration for the same process + * * The process is forked (to provide a new `service_instance_id`) + * + * This API can be called in a fork of the process that published the previous context, even though + * the context is not carried over into forked processes (although part of its memory allocations are). + * + * @param data Pointer to the data to publish. This data is copied into the context and only needs to be valid for the duration of + * the call. Must not be `NULL`. + * @return The result of the operation. + */ +otel_process_ctx_result otel_process_ctx_publish(const otel_process_ctx_data *data); + +/** + * Drops the current OpenTelemetry process context, if any. + * + * This method is safe to call even there's no current context. + * This method is NOT thread-safe. + * + * This API can be called in a fork of the process that published the current context to clean memory allocations + * related to the parent's context (even though the context itself is not carried over into forked processes). + * + * @return `true` if the context was successfully dropped or no context existed, `false` otherwise. + */ +bool otel_process_ctx_drop_current(void); + +/** This can be disabled if no read support is required. */ +#ifndef OTEL_PROCESS_CTX_NO_READ + typedef struct { + bool success; + const char *error_message; // Static strings only, non-NULL if success is false + otel_process_ctx_data data; // Strings are allocated using `malloc` and the caller is responsible for `free`ing them + } otel_process_ctx_read_result; + + /** + * Reads the current OpenTelemetry process context, if any. + * + * Useful for debugging and testing purposes. Underlying returned strings in `data` are dynamically allocated using + * `malloc` and `otel_process_ctx_read_drop` must be called to free them. + * + * Thread-safety: This function assumes there is no concurrent mutation of the process context. + * + * @return The result of the operation. If successful, `data` contains the retrieved context data. + */ + otel_process_ctx_read_result otel_process_ctx_read(void); + + /** + * Drops the data resulting from a previous call to `otel_process_ctx_read`. + * + * @param result The result of a previous call to `otel_process_ctx_read`. Must not be `NULL`. + * @return `true` if the data was successfully dropped, `false` otherwise. + */ + bool otel_process_ctx_read_drop(otel_process_ctx_read_result *result); +#endif + +#ifdef __cplusplus +} +#endif diff --git a/src/datadog/otel_process_ctx_guard.cpp b/src/datadog/otel_process_ctx_guard.cpp new file mode 100644 index 000000000..2f7ac78d5 --- /dev/null +++ b/src/datadog/otel_process_ctx_guard.cpp @@ -0,0 +1,39 @@ +#include "otel_process_ctx_guard.h" + +#include + +#include +#include + +namespace datadog { +namespace tracing { +namespace { + +std::mutex& otel_ctx_mutex() { + static std::mutex m; + return m; +} + +} // namespace + +OtelCtxGuard::~OtelCtxGuard() { + std::lock_guard lock(otel_ctx_mutex()); + otel_process_ctx_drop_current(); +} + +std::unique_ptr publish_otel_process_ctx( + const otel_process_ctx_data& data, Logger& logger) { + std::lock_guard lock(otel_ctx_mutex()); + const auto result = otel_process_ctx_publish(&data); + if (!result.success) { + logger.log_error([&](std::ostream& log) { + log << "Failed to publish OpenTelemetry process context: " + << result.error_message; + }); + return nullptr; + } + return std::make_unique(); +} + +} // namespace tracing +} // namespace datadog diff --git a/src/datadog/otel_process_ctx_guard.h b/src/datadog/otel_process_ctx_guard.h new file mode 100644 index 000000000..e35c26023 --- /dev/null +++ b/src/datadog/otel_process_ctx_guard.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include "otel_process_ctx.h" + +namespace datadog { +namespace tracing { + +class Logger; + +// RAII handle for a published OpenTelemetry process context. Destroying the +// guard drops the process-wide context. +// +// Multi-instance behavior: the OTel context is a per-process singleton. +// A successful publish replaces any previously-published context, and +// destroying any guard drops whatever is current (last writer wins style). +// +// This global state is serialized via an internal mutex so this class is +// thread-safe. +class OtelCtxGuard { + public: + ~OtelCtxGuard(); +}; + +// Publish the OTel process context. Returns a guard whose destructor drops +// the context, or nullptr on failure (errors are logged via `logger`). +std::unique_ptr publish_otel_process_ctx( + const otel_process_ctx_data& data, Logger& logger); + +} // namespace tracing +} // namespace datadog diff --git a/src/datadog/tracer.cpp b/src/datadog/tracer.cpp index 4209ac228..f00166ea1 100644 --- a/src/datadog/tracer.cpp +++ b/src/datadog/tracer.cpp @@ -22,6 +22,7 @@ #include "hex.h" #include "json.hpp" #include "msgpack.h" +#include "otel_process_ctx_guard.h" #include "platform_util.h" #include "random.h" #include "root_session_id.h" @@ -120,6 +121,10 @@ Tracer::Tracer(const FinalizedTracerConfig& config, store_config(process_tags); } +Tracer::~Tracer() = default; +Tracer::Tracer(Tracer&&) noexcept = default; +Tracer& Tracer::operator=(Tracer&&) noexcept = default; + std::string Tracer::config() const { // clang-format off auto config = nlohmann::json::object({ @@ -160,9 +165,16 @@ void Tracer::store_config( metadata_file_ = std::make_unique(std::move(*maybe_file)); - auto defaults = config_manager_->span_defaults(); - - std::string container_id = ""; + const auto defaults = config_manager_->span_defaults(); + const std::string& runtime_id_string = runtime_id_.string(); + const std::string& tracer_version_value = signature_.library_version; + const std::string tracer_language(signature_.library_language); + const std::string hostname_value = hostname_.value_or(""); + const std::string& service_name = defaults->service; + const std::string& service_env = defaults->environment; + const std::string& service_version = defaults->version; + const std::string process_tags_joined = join_tags(process_tags); + std::string container_id; if (auto maybe_container_id = container::get_id()) { container_id = maybe_container_id->value; } @@ -172,23 +184,57 @@ void Tracer::store_config( // clang-format off msgpack::pack_map( - buffer, - "schema_version", [&](auto& buffer) { msgpack::pack_integer(buffer, std::uint64_t(2)); return Expected{}; }, - "runtime_id", [&](auto& buffer) { return msgpack::pack_string(buffer, runtime_id_.string()); }, - "tracer_version", [&](auto& buffer) { return msgpack::pack_string(buffer, signature_.library_version); }, - "tracer_language", [&](auto& buffer) { return msgpack::pack_string(buffer, signature_.library_language); }, - "hostname", [&](auto& buffer) { return msgpack::pack_string(buffer, hostname_.value_or("")); }, - "service_name", [&](auto& buffer) { return msgpack::pack_string(buffer, defaults->service); }, - "service_env", [&](auto& buffer) { return msgpack::pack_string(buffer, defaults->environment); }, - "service_version", [&](auto& buffer) { return msgpack::pack_string(buffer, defaults->version); }, - "process_tags", [&](auto& buffer) { return msgpack::pack_string(buffer, join_tags(process_tags)); }, - "container_id", [&](auto& buffer) { return msgpack::pack_string(buffer, container_id); } + buffer, + "schema_version", [&](auto& buffer) { msgpack::pack_integer(buffer, std::uint64_t(2)); return Expected{}; }, + "runtime_id", [&](auto& buffer) { return msgpack::pack_string(buffer, runtime_id_string); }, + "tracer_version", [&](auto& buffer) { return msgpack::pack_string(buffer, tracer_version_value); }, + "tracer_language", [&](auto& buffer) { return msgpack::pack_string(buffer, tracer_language); }, + "hostname", [&](auto& buffer) { return msgpack::pack_string(buffer, hostname_value); }, + "service_name", [&](auto& buffer) { return msgpack::pack_string(buffer, service_name); }, + "service_env", [&](auto& buffer) { return msgpack::pack_string(buffer, service_env); }, + "service_version", [&](auto& buffer) { return msgpack::pack_string(buffer, service_version); }, + "process_tags", [&](auto& buffer) { return msgpack::pack_string(buffer, process_tags_joined); }, + "container_id", [&](auto& buffer) { return msgpack::pack_string(buffer, container_id); } ); // clang-format on if (!metadata_file_->write_then_seal(buffer)) { logger_->log_error("Either failed to write or seal the configuration file"); + return; } + +#ifdef __linux__ + // Publish the same metadata as an OpenTelemetry process context. + + // Make sure to leave host.name first... + const char* all_resource_attrs[] = { + "host.name", hostname_value.c_str(), "container.id", container_id.c_str(), + nullptr, + }; + // ...so that we can omit it when it's not available. + const char** resource_attrs = + hostname_ ? all_resource_attrs : all_resource_attrs + 2; + + const char* extra_attrs[] = { + "datadog.process_tags", + process_tags_joined.c_str(), + nullptr, + }; + + otel_process_ctx_data otel_data = {}; + otel_data.deployment_environment_name = service_env.c_str(); + otel_data.service_instance_id = runtime_id_string.c_str(); + otel_data.service_name = service_name.c_str(); + otel_data.service_version = service_version.c_str(); + otel_data.telemetry_sdk_language = tracer_language.c_str(); + otel_data.telemetry_sdk_version = tracer_version_value.c_str(); + otel_data.telemetry_sdk_name = tracer_library_name; + otel_data.resource_attributes = resource_attrs; + otel_data.extra_attributes = extra_attrs; + otel_data.thread_ctx_config = nullptr; + + otel_guard_ = publish_otel_process_ctx(otel_data, *logger_); +#endif } Span Tracer::create_span() { return create_span(SpanConfig{}); } diff --git a/src/datadog/version.cpp b/src/datadog/version.cpp index 8b9710c64..6567442f6 100644 --- a/src/datadog/version.cpp +++ b/src/datadog/version.cpp @@ -2,10 +2,12 @@ namespace datadog::tracing { +#define DD_TRACE_LIBRARY_NAME "dd-trace-cpp" #define DD_TRACE_VERSION "v2.1.2" +const char* const tracer_library_name = DD_TRACE_LIBRARY_NAME; const char* const tracer_version = DD_TRACE_VERSION; const char* const tracer_version_string = - "[dd-trace-cpp version " DD_TRACE_VERSION "]"; + "[" DD_TRACE_LIBRARY_NAME " version " DD_TRACE_VERSION "]"; } // namespace datadog::tracing diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7571aa8b7..e2298776e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,6 +29,7 @@ add_executable(tests test_glob.cpp test_limiter.cpp test_msgpack.cpp + test_otel_process_ctx.cpp test_platform_util.cpp test_parse_util.cpp test_smoke.cpp diff --git a/test/test_otel_process_ctx.cpp b/test/test_otel_process_ctx.cpp new file mode 100644 index 000000000..fd9a51d40 --- /dev/null +++ b/test/test_otel_process_ctx.cpp @@ -0,0 +1,136 @@ +#include + +#include "mocks/collectors.h" +#include "mocks/loggers.h" +#include "otel_process_ctx.h" +#include "platform_util.h" +#include "test.h" + +namespace fs = std::filesystem; +using namespace datadog::tracing; + +#define OTEL_CTX_TEST(x) TEST_CASE(x, "[otel_process_ctx]") + +namespace { + +std::map to_map(const char** kv) { + std::map out; + if (kv == nullptr) return out; + for (std::size_t i = 0; kv[i] != nullptr && kv[i + 1] != nullptr; i += 2) { + out.emplace(kv[i], kv[i + 1]); + } + return out; +} + +std::map parse_joined_tags(const std::string& s) { + std::map out; + std::istringstream in(s); + for (std::string pair; std::getline(in, pair, ',');) { + const auto colon = pair.find(':'); + out.emplace(pair.substr(0, colon), pair.substr(colon + 1)); + } + return out; +} + +} // namespace + +OTEL_CTX_TEST("Tracer construction publishes OTel process context") { +#ifndef __linux__ + SUCCEED("OpenTelemetry process context is Linux-only"); +#else + std::string expected_container_id; + if (auto id = container::get_id()) { + expected_container_id = id->value; + } + const RuntimeID expected_runtime_id = RuntimeID::generate(); + + std::map expected_process_tags = { + {"custom.tag", "custom-value"}, + }; + expected_process_tags.emplace("entrypoint.name", get_process_name()); + expected_process_tags.emplace("entrypoint.type", "executable"); + expected_process_tags.emplace("entrypoint.workdir", + fs::current_path().filename().string()); + if (auto path = get_process_path()) { + expected_process_tags.emplace("entrypoint.basedir", + path->parent_path().filename().string()); + } + + TracerConfig config; + config.service = "otel-ctx-svc"; + config.environment = "otel-ctx-env"; + config.version = "1.2.3"; + config.runtime_id = expected_runtime_id; + config.report_hostname = true; + config.process_tags = {{"custom.tag", "custom-value"}}; + config.collector = std::make_shared(); + config.logger = std::make_shared(); + + auto finalized = finalize_config(config); + REQUIRE(finalized); + + { + Tracer tracer{*finalized}; + + auto read_result = otel_process_ctx_read(); + REQUIRE(read_result.success); + const auto& data = read_result.data; + + CHECK(std::string(data.service_name) == "otel-ctx-svc"); + CHECK(std::string(data.deployment_environment_name) == "otel-ctx-env"); + CHECK(std::string(data.service_version) == "1.2.3"); + CHECK(std::string(data.service_instance_id) == + expected_runtime_id.string()); + CHECK(std::string(data.telemetry_sdk_language) == "cpp"); + CHECK(std::string(data.telemetry_sdk_name) == "dd-trace-cpp"); + CHECK(std::string(data.telemetry_sdk_version) == tracer_version); + + const std::map expected_resource = { + {"host.name", get_hostname()}, + {"container.id", expected_container_id}, + }; + CHECK(to_map(data.resource_attributes) == expected_resource); + + const auto extra = to_map(data.extra_attributes); + REQUIRE(extra.size() == 1); + REQUIRE(extra.count("datadog.process_tags") == 1); + CHECK(parse_joined_tags(extra.at("datadog.process_tags")) == + expected_process_tags); + + REQUIRE(otel_process_ctx_read_drop(&read_result)); + } + + auto post_read = otel_process_ctx_read(); + CHECK_FALSE(post_read.success); + if (post_read.success) { + otel_process_ctx_read_drop(&post_read); + } +#endif +} + +OTEL_CTX_TEST("host.name is omitted when report_hostname is false") { +#ifndef __linux__ + SUCCEED("OpenTelemetry process context is Linux-only"); +#else + TracerConfig config; + config.service = "otel-ctx-svc"; + config.report_hostname = false; + config.collector = std::make_shared(); + config.logger = std::make_shared(); + + auto finalized = finalize_config(config); + REQUIRE(finalized); + + Tracer tracer{*finalized}; + auto read_result = otel_process_ctx_read(); + REQUIRE(read_result.success); + + // Sanity check that context is not empty + CHECK(std::string(read_result.data.service_name) == "otel-ctx-svc"); + + const auto resource = to_map(read_result.data.resource_attributes); + CHECK(resource.count("host.name") == 0); + + REQUIRE(otel_process_ctx_read_drop(&read_result)); +#endif +} diff --git a/test/test_tracer.cpp b/test/test_tracer.cpp index 72231115f..9c4a7da49 100644 --- a/test/test_tracer.cpp +++ b/test/test_tracer.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include "matchers.h" @@ -1882,8 +1883,16 @@ TEST_TRACER("heterogeneous extraction") { REQUIRE(writer.items == test_case.expected_injected_headers); } -TEST_TRACER("move semantics") { - // Verify that `Tracer` can be moved. +TEST_TRACER("move-only semantics") { + static_assert(std::is_move_constructible::value, + "Tracer must be move-constructible"); + static_assert(std::is_move_assignable::value, + "Tracer must be move-assignable"); + static_assert(!std::is_copy_constructible::value, + "Tracer must not be copy-constructible"); + static_assert(!std::is_copy_assignable::value, + "Tracer must not be copy-assignable"); + TracerConfig config; config.service = "testsvc"; config.logger = std::make_shared(); @@ -1891,11 +1900,11 @@ TEST_TRACER("move semantics") { auto finalized_config = finalize_config(config); REQUIRE(finalized_config); - Tracer tracer1{*finalized_config}; - // This must compile. + // The moved-into Tracer must remain usable. + Tracer tracer1{*finalized_config}; Tracer tracer2{std::move(tracer1)}; - (void)tracer2; + { auto span = tracer2.create_span(); } } TEST_TRACER("APM tracing disabled") {