diff --git a/libdd-otel-thread-ctx-ffi/src/lib.rs b/libdd-otel-thread-ctx-ffi/src/lib.rs index ec77d043d7..e9e243fac2 100644 --- a/libdd-otel-thread-ctx-ffi/src/lib.rs +++ b/libdd-otel-thread-ctx-ffi/src/lib.rs @@ -24,7 +24,7 @@ pub extern "C" fn ddog_otel_thread_ctx_sanity_check() -> libdd_common_ffi::VoidR #[cfg(target_os = "linux")] mod linux { - use libdd_otel_thread_ctx::linux::{ThreadContext, ThreadContextHandle}; + use libdd_otel_thread_ctx::linux::{OwnedThreadContext, ThreadContext}; use std::ptr::NonNull; /// Maximum size in bytes of the `attrs_data` field of a thread context record. @@ -49,8 +49,8 @@ mod linux { trace_id: &[u8; 16], span_id: &[u8; 8], local_root_span_id: &[u8; 8], - ) -> NonNull { - ThreadContext::new(*trace_id, *span_id, *local_root_span_id, &[]).into_opaque_ptr() + ) -> NonNull { + OwnedThreadContext::new(*trace_id, *span_id, *local_root_span_id, &[]).into_opaque_ptr() } /// Free an owned thread context. @@ -61,9 +61,9 @@ mod linux { /// `ddog_otel_thread_ctx_detach`, and must not be used after this call. In particular, `ctx` /// must not be currently attached to a thread. #[no_mangle] - pub unsafe extern "C" fn ddog_otel_thread_ctx_free(ctx: *mut ThreadContextHandle) { + pub unsafe extern "C" fn ddog_otel_thread_ctx_free(ctx: *mut ThreadContext) { if let Some(ctx) = NonNull::new(ctx) { - let _ = ThreadContext::from_opaque_ptr(ctx); + let _ = OwnedThreadContext::from_opaque_ptr(ctx); } } @@ -77,11 +77,11 @@ mod linux { /// attached. #[no_mangle] pub unsafe extern "C" fn ddog_otel_thread_ctx_attach( - ctx: *mut ThreadContextHandle, - ) -> Option> { - ThreadContext::from_opaque_ptr(NonNull::new(ctx)?) + ctx: *mut ThreadContext, + ) -> Option> { + OwnedThreadContext::from_opaque_ptr(NonNull::new(ctx)?) .attach() - .map(ThreadContext::into_opaque_ptr) + .map(OwnedThreadContext::into_opaque_ptr) } /// Remove the currently attached context from the TLS slot. @@ -89,8 +89,8 @@ mod linux { /// Returns the detached context (caller now owns it and must release it with /// `ddog_otel_thread_ctx_free`), or null if the slot was empty. #[no_mangle] - pub extern "C" fn ddog_otel_thread_ctx_detach() -> Option> { - ThreadContext::detach().map(ThreadContext::into_opaque_ptr) + pub extern "C" fn ddog_otel_thread_ctx_detach() -> Option> { + OwnedThreadContext::detach().map(OwnedThreadContext::into_opaque_ptr) } /// Update the currently attached context in-place. @@ -103,6 +103,6 @@ mod linux { span_id: &[u8; 8], local_root_span_id: &[u8; 8], ) { - ThreadContext::update(*trace_id, *span_id, *local_root_span_id, &[]); + OwnedThreadContext::update(*trace_id, *span_id, *local_root_span_id, &[]); } } diff --git a/libdd-otel-thread-ctx/src/lib.rs b/libdd-otel-thread-ctx/src/lib.rs index 10142a4c12..129cf66fe5 100644 --- a/libdd-otel-thread-ctx/src/lib.rs +++ b/libdd-otel-thread-ctx/src/lib.rs @@ -18,15 +18,15 @@ //! This avoids allocation in the hot path. //! //! ```ignore -//! use libdd_otel_thread_ctx::linux::ThreadContext; +//! use libdd_otel_thread_ctx::linux::OwnedThreadContext; //! //! let trace_id = [0u8; 16]; //! let span_id = [1u8; 8]; //! //! // First call allocates a record and attaches it. -//! ThreadContext::new(trace_id, span_id, &[(0, "first")]).attach(); -//! ThreadContext::update(trace_id, span_id, &[(0, "second")]); -//! ThreadContext::detach(); +//! OwnedThreadContext::new(trace_id, span_id, [0u8; 8], &[(0, "first")]).attach(); +//! OwnedThreadContext::update(trace_id, span_id, [0u8; 8], &[(0, "second")]); +//! OwnedThreadContext::detach(); //! ``` //! //! ### Swapping @@ -36,14 +36,14 @@ //! might run on the same thread, or even move from one thread to another, for example. //! //! ```ignore -//! use libdd_otel_thread_ctx::linux::ThreadContext; +//! use libdd_otel_thread_ctx::linux::OwnedThreadContext; //! //! let trace_id = [0u8; 16]; //! let span_id = [1u8; 8]; //! let attrs: &[(u8, &str)] = &[(0, "GET"), (1, "/api/v1")]; //! //! // Publish a new context and save the previously attached one (if any). -//! let ctx = ThreadContext::new(trace_id, span_id, attrs); +//! let ctx = OwnedThreadContext::new(trace_id, span_id, [0u8; 8], attrs); //! let previous = ctx.attach(); //! //! // ... do work inside the span ... @@ -73,7 +73,10 @@ pub mod linux { ffi::c_void, mem, ptr::{self, NonNull}, - sync::atomic::{compiler_fence, AtomicPtr, AtomicU8, Ordering}, + sync::{ + atomic::{compiler_fence, AtomicPtr, AtomicU8, Ordering}, + Arc, + }, }; /// Run `f` with an atomic view of the current thread's TLS slot. @@ -84,7 +87,7 @@ pub mod linux { /// /// The slot is read by an async signal handler. Atomic operations should in general use /// [Ordering::Relaxed], but modifications to the record might need additional compiler-only - /// fences (see [ThreadContext::update] for an example). + /// fences (see [`ThreadContext::update`] for an example). fn with_tls_slot(f: F) -> R where F: FnOnce(&AtomicPtr) -> R, @@ -266,6 +269,60 @@ pub mod linux { self.attrs_data_size = offset as u16; fully_encoded } + + /// Update this record in-place. Sets `valid = 0` before the update and `valid = 1` after, + /// so a reader that fires between the two writes sees an inconsistent record and skips it. + /// Compiler fences prevent the compiler from reordering field writes outside that window. + fn update( + &mut self, + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) { + self.valid.store(0, Ordering::Relaxed); + compiler_fence(Ordering::SeqCst); + + self.trace_id = trace_id; + self.span_id = span_id; + self.set_attrs(local_root_span_id, attrs); + + compiler_fence(Ordering::SeqCst); + self.valid.store(1, Ordering::Relaxed); + } + + /// Publish `record_ptr` into the current thread's TLS slot and return the raw pointer to + /// the previously attached record (null if none). + /// + /// This is the low-level attach primitive shared by [`OwnedThreadContext`] and + /// [`SharedThreadContext`]. It only moves raw pointers in and out of the slot; the caller + /// is responsible for the ownership semantics of both the passed-in pointer (which is now + /// published and must be kept alive until detached) and the returned one (which must be + /// reclaimed into an owning handle to avoid leaks). + /// + /// # Memory ordering + /// + /// [^tls-slot-ordering]: since we get back the previous record, we could in principle use + /// an `Acquire` (thus combining into an `AcqRel`) compiler fence to make sure we don't get + /// back a not-yet-initialized record. However, this thread (excluding the reader signal + /// handler) is the only one to ever _write_ to the slot, so the store we load the value + /// from automatically happens-before (because it's sequenced-before) the swap. We still + /// need a release fence to avoid exposing uninitialized memory to the handler. + fn attach_raw(record_ptr: *mut ThreadContextRecord) -> *mut ThreadContextRecord { + compiler_fence(Ordering::Release); + with_tls_slot(|slot| slot.swap(record_ptr, Ordering::Relaxed)) + } + + /// Detach the current record from the current thread's TLS slot, returning the raw pointer + /// to the previously attached record (null if none). + /// + /// This is the low-level detach primitive shared by [`OwnedThreadContext`] and + /// [`SharedThreadContext`]. As with [`Self::attach_raw`], the caller is responsible for + /// releasing the returned memory. + fn detach_raw() -> *mut ThreadContextRecord { + // We don't need any fence here, see [^tls-slot-ordering]. + with_tls_slot(|slot| slot.swap(ptr::null_mut(), Ordering::Relaxed)) + } } impl Default for ThreadContextRecord { @@ -282,24 +339,59 @@ pub mod linux { } } + /// A thread-level context. + /// + /// This is the public, value-level view of a thread context. It is a thin, transparent + /// wrapper around the internal record layout, intentionally hiding the underlying structure + /// (which requires care to manipulate: async-signal-safety, the seq-lock-like update + /// protocol, etc.). + #[repr(transparent)] + #[derive(Default)] + pub struct ThreadContext(ThreadContextRecord); + + impl ThreadContext { + /// Create a new thread context with the given trace/span IDs and encoded attributes. + pub fn new( + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) -> Self { + Self(ThreadContextRecord::new( + trace_id, + span_id, + local_root_span_id, + attrs, + )) + } + + /// Update this context in-place. Sets `valid = 0` before the update and `valid = 1` after, + /// so a reader that fires between the two writes sees an inconsistent record and skips it. + /// Compiler fences prevent the compiler from reordering field writes outside that window. + pub fn update( + &mut self, + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) { + self.0.update(trace_id, span_id, local_root_span_id, attrs); + } + } + /// An owned (and non-moving) thread context record allocation. /// /// We don't use `Box` under the hood because it precludes aliasing, while we share the context /// to readers through thread-level context and through the FFI. But it is a boxed /// `ThreadContextRecord` for all intent and purpose. /// - /// The context is `!Send` and `!Sync`; it is supposed to stay on the same thread and is thus + /// Since an owned context can be modified in place, it is `!Send` and `!Sync`. Readers rely on + /// the fact that their can't be any writer while they interrupt the current thread, but this + /// wouldn't be true anymore if we moved `OwnedThreadContext` to a different thread. It is thus /// not thread-safe. - pub struct ThreadContext(NonNull); - - /// Opaque handle to a thread context record. Used to allow the FFI to convert [ThreadContext] - /// to and from raw pointers without exposing [ThreadContextRecord], as the latter needs extra - /// care to be manipulated (async-signal-safety, seq-lock-like modification protocol through - /// [ThreadContextRecord::valid], etc.) - #[repr(C)] - pub struct ThreadContextHandle {} + pub struct OwnedThreadContext(NonNull); - impl ThreadContext { + impl OwnedThreadContext { /// Create a new thread context with the given trace/span IDs and encoded attributes. pub fn new( trace_id: [u8; 16], @@ -315,7 +407,7 @@ pub mod linux { )) } - /// Turn this thread context into a pointer to the underlying [ThreadContextRecord]. + /// Turn this thread context into a pointer to the underlying [`ThreadContextRecord`]. /// The pointer must be reconstructed through [`Self::from_ptr`] in order to be properly /// dropped, or the record will leak. fn into_ptr(self) -> NonNull { @@ -323,85 +415,51 @@ pub mod linux { mdrop.0 } - /// Turn this thread context into an opaque pointer to the underlying [ThreadContextRecord]. + /// Turn this thread context into an opaque pointer to the underlying [`ThreadContext`]. /// The pointer must be reconstructed through [`Self::from_opaque_ptr`] in order to be /// properly dropped, or the record will leak. - pub fn into_opaque_ptr(self) -> NonNull { + pub fn into_opaque_ptr(self) -> NonNull { let mdrop = mem::ManuallyDrop::new(self); mdrop.0.cast() } - /// Reconstruct a [ThreadContextRecord] from a pointer that comes - /// from [`Self::into_ptr`]. + /// Reconstruct an [`OwnedThreadContext`] from a pointer that comes from + /// [`Self::into_ptr`]. /// /// # Safety /// /// - `ptr` must come from a prior call to [`Self::into_ptr`]. /// - if `ptr` is aliased, accesses through aliases must not be interleaved with method - /// calls on the returned [ThreadContextRecord]. More precisely, mutable references might + /// calls on the returned [`OwnedThreadContext`]. More precisely, mutable references might /// be reconstructed during those calls, so any constraint from either Stacked Borrows, /// Tree Borrows or whatever is the current aliasing model implemented in Miri applies. unsafe fn from_ptr(ptr: NonNull) -> Self { Self(ptr) } - /// Reconstruct an [OpaqueThreadContextRecord] from a pointer that comes from + /// Reconstruct an [`OwnedThreadContext`] from a pointer that comes from /// [`Self::into_opaque_ptr`]. /// /// # Safety /// /// - `ptr` must come from a prior call to [`Self::into_opaque_ptr`]. /// - if `ptr` is aliased, accesses through aliases must not be interleaved with method - /// calls on the returned [ThreadContextRecord]. More precisely, mutable references might + /// calls on the returned [`OwnedThreadContext`]. More precisely, mutable references might /// be reconstructed during those calls, so any constraint from either Stacked Borrows, /// Tree Borrows or whatever is the current aliasing model implemented in Miri applies. - pub unsafe fn from_opaque_ptr(ptr: NonNull) -> Self { + pub unsafe fn from_opaque_ptr(ptr: NonNull) -> Self { Self(ptr.cast()) } - } - - impl Default for ThreadContext { - fn default() -> Self { - Self::from(ThreadContextRecord::default()) - } - } - - impl From for ThreadContext { - fn from(record: ThreadContextRecord) -> Self { - // Safety: `Box::into_raw` returns a non-null pointer - unsafe { Self(NonNull::new_unchecked(Box::into_raw(Box::new(record)))) } - } - } - - impl ThreadContext { - /// Atomically swap the current context with a pointer value. Return the previously - /// attached context, if any. - fn swap( - slot: &AtomicPtr, - tgt: *mut ThreadContextRecord, - ) -> Option { - // Safety: a non-null value in the slot came from a prior `into_ptr` call. - NonNull::new(slot.swap(tgt, Ordering::Relaxed)) - .map(|ptr| unsafe { ThreadContext::from_ptr(ptr) }) - } /// Publish a new (or previously detached) thread context record by writing its pointer /// into the TLS slot. Returns the previously attached context, if any. /// /// `valid` is already `1` since construction, so any reader that observes the new pointer /// also observes `valid = 1`. - pub fn attach(self) -> Option { - // [^tls-slot-ordering]: since we get back the previous context, we should in principle - // use an `Acquire` (thus combining into an `AcqRel`) compiler fence to make sure we - // don't get back a not-yet-initialized record. - // - // However, this thread (excluding the reader signal handler) is the only one to ever - // _write_ to the context, so the store we load the value from automatically - // happens-before (because it's sequenced-before) the swap. - // - // We still need a release fence to avoid exposing uninitialized memory to the handler. - compiler_fence(Ordering::Release); - with_tls_slot(|slot| Self::swap(slot, self.into_ptr().as_ptr())) + pub fn attach(self) -> Option { + let prev = ThreadContextRecord::attach_raw(self.into_ptr().as_ptr()); + // Safety: a non-null value in the slot came from a prior `into_ptr` call. + NonNull::new(prev).map(|ptr| unsafe { OwnedThreadContext::from_ptr(ptr) }) } /// Update the currently attached record in-place. Sets `valid = 0` before the update and @@ -410,7 +468,7 @@ pub mod linux { /// outside that window. /// /// If there's currently no attached context, `update` will create one, and is in this case - /// equivalent to `ThreadContext::new(trace_id, span_id, attrs).attach()`. + /// equivalent to `OwnedThreadContext::new(trace_id, span_id, attrs).attach()`. pub fn update( trace_id: [u8; 16], span_id: [u8; 8], @@ -422,38 +480,52 @@ pub mod linux { // and only this thread ever writes to the slot, so the pointer is valid and not // accessed for the duration of this closure. if let Some(current) = unsafe { slot.load(Ordering::Relaxed).as_mut() } { - current.valid.store(0, Ordering::Relaxed); - compiler_fence(Ordering::SeqCst); - - current.trace_id = trace_id; - current.span_id = span_id; - current.set_attrs(local_root_span_id, attrs); - - compiler_fence(Ordering::SeqCst); - current.valid.store(1, Ordering::Relaxed); + current.update(trace_id, span_id, local_root_span_id, attrs); } else { - let ctxt = ThreadContext::new(trace_id, span_id, local_root_span_id, attrs) - .into_ptr() - .as_ptr(); + let ctxt = + OwnedThreadContext::new(trace_id, span_id, local_root_span_id, attrs) + .into_ptr() + .as_ptr(); // No need for `AcqRel`, see [^tls-slot-ordering]. compiler_fence(Ordering::Release); - // `ThreadContext::new` already initialises `valid = 1`. - let _ = Self::swap(slot, ctxt); + // The slot was null; publish the freshly created record, which + // `OwnedThreadContext::new` already initialises with `valid = 1`. + slot.store(ctxt, Ordering::Relaxed); } }) } /// Detach the current record from the TLS slot. Writes null to the slot and returns the /// detached record. - pub fn detach() -> Option { - // We don't need any fence here, see [^tls-slot-ordering]. - with_tls_slot(|slot| Self::swap(slot, ptr::null_mut())) + pub fn detach() -> Option { + // Safety: a non-null value in the slot came from a prior `into_ptr` call. + NonNull::new(ThreadContextRecord::detach_raw()) + .map(|ptr| unsafe { OwnedThreadContext::from_ptr(ptr) }) + } + } + + impl Default for OwnedThreadContext { + fn default() -> Self { + Self::from(ThreadContextRecord::default()) + } + } + + impl From for OwnedThreadContext { + fn from(record: ThreadContextRecord) -> Self { + // Safety: `Box::into_raw` returns a non-null pointer + unsafe { Self(NonNull::new_unchecked(Box::into_raw(Box::new(record)))) } } } - impl Drop for ThreadContext { + impl From for OwnedThreadContext { + fn from(ctx: ThreadContext) -> Self { + Self::from(ctx.0) + } + } + + impl Drop for OwnedThreadContext { fn drop(&mut self) { - // Safety: `self.0` was obtained from a `Box::new`, and `ThreadContext` represents + // Safety: `self.0` was obtained from a `Box::new`, and `OwnedThreadContext` represents // ownership of the underlying memory. unsafe { let _ = Box::from_raw(self.0.as_ptr()); @@ -461,11 +533,96 @@ pub mod linux { } } + /// A thread-level context shared immutably across threads. + /// + /// Unlike [`OwnedThreadContext`], a shared context holds its record behind an + /// `Arc`. The record can't be mutated in place (there is no `update` method). + /// Attaching and detaching only publish or retract the record pointer in the current thread's + /// TLS slot while updating the `Arc` pointer to ensure the record stays alive for exactly as + /// long as it is attached somewhere. + /// + /// Since attaching is an atomic swap, and a shared context is immutable once created, it can be + /// moved freely between threads. Several threads can also attach the same shared context + /// concurrently and their readers can observe it at the same time. The type is therefore `Send` + /// and `Sync`, in contrast with [`OwnedThreadContext`]. + /// + /// The wrapped `Arc` is recoverable through the [`From`] conversions in both + /// directions, so callers can build, store, clone or share the context through their own + /// machinery. + /// + /// # Mixing with [`OwnedThreadContext`] + /// + /// The TLS slot is untyped, it only holds a record pointer. [`Self::detach`] (and the previous + /// context returned by [`Self::attach`]) assumes the attached record, if any, is a shared one + /// and reconstructs an `Arc` from it. A single thread must therefore not interleave owned and + /// shared attaches on the same slot, or detaching would misinterpret the pointer. Similarly, + /// one should not call [OwnedThreadContext::update] after installing a shared context. + /// + /// TODO: statically prevent this situation. + pub struct SharedThreadContext(Arc); + + impl From> for SharedThreadContext { + fn from(inner: Arc) -> Self { + Self(inner) + } + } + + impl From for Arc { + fn from(ctx: SharedThreadContext) -> Self { + ctx.0 + } + } + + impl SharedThreadContext { + /// Reconstruct a [`SharedThreadContext`] from the record pointer stored in the TLS slot, + /// reclaiming the strong reference that [`Self::attach`] moved into the slot. + /// + /// # Safety + /// + /// `ptr` must originate from a prior [`Self::attach`] (i.e. it is the record pointer of an + /// `Arc` whose strong reference was moved into the slot), and that reference + /// must not have been reclaimed yet. + unsafe fn from_record_ptr(ptr: NonNull) -> Self { + // `ThreadContext` is `repr(transparent)` over `ThreadContextRecord`, so the record + // pointer is exactly the `*const ThreadContext` that `Arc::into_raw` produced. + Self(Arc::from_raw(ptr.as_ptr() as *const ThreadContext)) + } + + /// Publish this shared context by writing its record pointer into the current thread's TLS + /// slot, cloning the `Arc` into the slot so the record stays alive for as long as it is + /// attached. Returns the previously attached shared context, if any. + pub fn attach(self) -> Option { + // Move the strong reference into the TLS slot; it stays alive until detached. + // `ThreadContext` is `repr(transparent)` over `ThreadContextRecord`, so the data + // pointer is also the record pointer readers expect. + // + // Though our `SharedThreadContext` might actually come from a different thread now, + // it's wrapped in an `Arc` that handles drop safety by synchronizing on the reference + // count. The record is already `valid = 1` and, being shared, immutable. + let record_ptr = Arc::into_raw(self.0) as *mut ThreadContextRecord; + // Safety: a non-null value in the slot came from a prior `attach` of a shared + // context (see the type-level note on not mixing owned and shared attaches). + NonNull::new(ThreadContextRecord::attach_raw(record_ptr)) + .map(|ptr| unsafe { Self::from_record_ptr(ptr) }) + } + + /// Detach the currently attached shared context from the TLS slot and return it. Dropping + /// the returned value decrements the strong count that [`Self::attach`] moved into the + /// slot. Returns `None` if the slot was empty. + pub fn detach() -> Option { + // Safety: a non-null value in the slot came from a prior `attach` of a shared + // context (see the type-level note on not mixing owned and shared attaches). + NonNull::new(ThreadContextRecord::detach_raw()) + .map(|ptr| unsafe { Self::from_record_ptr(ptr) }) + } + } + #[cfg(test)] // The tests are set to be ignored by Miri, since accessing the TLS through C isn't supported. mod tests { - use super::{ThreadContext, ThreadContextRecord}; + use super::{OwnedThreadContext, SharedThreadContext, ThreadContext, ThreadContextRecord}; use std::sync::atomic::Ordering; + use std::sync::Arc; /// Read the TLS pointer for the current thread (the value stored in the TLS slot, not the /// address of the slot itself). @@ -484,13 +641,13 @@ pub mod linux { read_tls_context_ptr().is_null(), "TLS must be null initially" ); - ThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); + OwnedThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); assert!( !read_tls_context_ptr().is_null(), "TLS must not be null after attach" ); - let prev = ThreadContext::detach().unwrap(); + let prev = OwnedThreadContext::detach().unwrap(); unsafe { assert!( @@ -516,7 +673,7 @@ pub mod linux { let span_id = [2u8; 8]; let root_span_id = [3u8; 8]; - ThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); + OwnedThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null(), "TLS must be non-null after attach"); @@ -529,14 +686,14 @@ pub mod linux { // 1 (key) + 1 (len) + 16 (root_span_id hex chars) = 18 assert_eq!(record.attrs_data_size, 18); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); } #[test] #[cfg_attr(miri, ignore)] fn attribute_encoding_basic() { let attrs: &[(u8, &str)] = &[(1, "GET"), (2, "/api/v1")]; - ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); + OwnedThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); @@ -554,7 +711,7 @@ pub mod linux { assert_eq!(record.attrs_data[24], 7); assert_eq!(&record.attrs_data[25..32], b"/api/v1"); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); } #[test] @@ -576,7 +733,7 @@ pub mod linux { (3, val_c.as_str()), ]; - ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); + OwnedThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); @@ -588,7 +745,7 @@ pub mod linux { assert_eq!(record.attrs_data[275], 2); assert_eq!(record.attrs_data[276], 255); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); } #[test] @@ -602,7 +759,7 @@ pub mod linux { let root_span_id2 = [0x79, 0x7A, 0x7B, 0x7C, 0x7D, 0x7E, 0x7F, 0x80]; // Updating before any context is attached should be equivalent to `attach()` - ThreadContext::update(trace_id1, span_id1, root_span_id1, &[(0, "v1")]); + OwnedThreadContext::update(trace_id1, span_id1, root_span_id1, &[(0, "v1")]); let ptr_before = read_tls_context_ptr(); assert!(!ptr_before.is_null()); @@ -617,7 +774,7 @@ pub mod linux { assert_eq!(record.attrs_data[19], 2); assert_eq!(&record.attrs_data[20..22], b"v1"); - ThreadContext::update(trace_id2, span_id2, root_span_id2, &[(0, "v2")]); + OwnedThreadContext::update(trace_id2, span_id2, root_span_id2, &[(0, "v2")]); let ptr_after = read_tls_context_ptr(); assert_eq!( @@ -636,21 +793,21 @@ pub mod linux { assert_eq!(record.attrs_data[19], 2); assert_eq!(&record.attrs_data[20..22], b"v2"); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); } #[test] #[cfg_attr(miri, ignore)] fn explicit_detach_nulls_tls() { - ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[]).attach(); + OwnedThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[]).attach(); assert!(!read_tls_context_ptr().is_null()); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); // Calling detach again is safe (no-op, returns None). - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); } @@ -658,7 +815,8 @@ pub mod linux { #[cfg_attr(miri, ignore)] fn long_value_capped_at_255_bytes() { let long_val = "a".repeat(300); - ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[(0, long_val.as_str())]).attach(); + OwnedThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[(0, long_val.as_str())]) + .attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); @@ -669,7 +827,7 @@ pub mod linux { assert_eq!(val_len, 255, "value must be capped at 255 bytes"); assert_eq!(record.attrs_data_size, 2 + 16 + 2 + 255); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); } // Make sure the C shim is indeed providing a thread-local address. @@ -689,8 +847,13 @@ pub mod linux { let main_root_span_id = [0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA]; let handle = std::thread::spawn(move || { - ThreadContext::new(spawned_trace_id, spawned_span_id, spawned_root_span_id, &[]) - .attach(); + OwnedThreadContext::new( + spawned_trace_id, + spawned_span_id, + spawned_root_span_id, + &[], + ) + .attach(); // Let the main thread attach its own record and verify its slot. b.wait(); @@ -705,7 +868,7 @@ pub mod linux { assert_eq!(record.span_id, spawned_span_id); assert_eq!(&record.attrs_data[2..18], b"efdecdbcab9a8978"); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); }); @@ -717,7 +880,7 @@ pub mod linux { "main thread should see a null pointer and not another thread's context" ); - ThreadContext::new(main_trace_id, main_span_id, main_root_span_id, &[]).attach(); + OwnedThreadContext::new(main_trace_id, main_span_id, main_root_span_id, &[]).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null(), "main thread TLS must be set"); @@ -728,10 +891,100 @@ pub mod linux { barrier.wait(); - let _ = ThreadContext::detach(); + let _ = OwnedThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); handle.join().unwrap(); } + + #[test] + #[cfg_attr(miri, ignore)] + fn shared_attach_detach_lifecycle() { + let trace_id = [4u8; 16]; + let span_id = [5u8; 8]; + let root_span_id = [6u8; 8]; + + let cell: Arc = + Arc::new(ThreadContext::new(trace_id, span_id, root_span_id, &[])); + + assert!(read_tls_context_ptr().is_null()); + assert_eq!(Arc::strong_count(&cell), 1); + + // Attaching moves a (cloned) strong reference into the slot and publishes the record. + let prev = SharedThreadContext::from(Arc::clone(&cell)).attach(); + assert!(prev.is_none(), "nothing was attached before"); + assert_eq!( + Arc::strong_count(&cell), + 2, + "attach must keep a strong reference alive in the slot" + ); + + let ptr = read_tls_context_ptr(); + assert!(!ptr.is_null(), "TLS must be set after attach"); + let record = unsafe { &*ptr }; + assert_eq!(record.trace_id, trace_id); + assert_eq!(record.span_id, span_id); + assert_eq!(record.valid.load(Ordering::Relaxed), 1); + + // Detaching gives the shared context back and clears the slot. + let detached = SharedThreadContext::detach().expect("a context must be attached"); + assert!( + read_tls_context_ptr().is_null(), + "TLS must be null after detach" + ); + // `detached` still holds the reference that was in the slot: cell + detached = 2. + assert_eq!(Arc::strong_count(&cell), 2); + + drop(detached); + assert_eq!( + Arc::strong_count(&cell), + 1, + "dropping the detached context must release the bumped reference" + ); + + // Detaching again is a no-op. + assert!(SharedThreadContext::detach().is_none()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn shared_attach_replaces_previous() { + let first: Arc = + Arc::new(ThreadContext::new([7u8; 16], [8u8; 8], [9u8; 8], &[])); + let second: Arc = + Arc::new(ThreadContext::new([0xAu8; 16], [0xBu8; 8], [0xCu8; 8], &[])); + + let prev = SharedThreadContext::from(Arc::clone(&first)).attach(); + assert!(prev.is_none(), "nothing was attached before"); + + // Attaching a second context returns the first one and publishes the second. + let prev = SharedThreadContext::from(Arc::clone(&second)) + .attach() + .expect("must return the previously attached context"); + let record = unsafe { &*read_tls_context_ptr() }; + assert_eq!(record.trace_id, [0xAu8; 16]); + assert_eq!(Arc::strong_count(&second), 2, "second is now in the slot"); + + // The returned previous context still holds `first`'s slot reference. + assert_eq!(Arc::strong_count(&first), 2); + drop(prev); + assert_eq!(Arc::strong_count(&first), 1); + + let _ = SharedThreadContext::detach(); + assert!(read_tls_context_ptr().is_null()); + assert_eq!(Arc::strong_count(&second), 1); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn shared_arc_round_trip() { + let cell: Arc = Arc::new(ThreadContext::default()); + let shared = SharedThreadContext::from(Arc::clone(&cell)); + let back: Arc = shared.into(); + assert!( + Arc::ptr_eq(&cell, &back), + "round-trip must preserve the allocation" + ); + } } }