From 68f1cb320548f599550e90e5f73a36eb2972f193 Mon Sep 17 00:00:00 2001 From: citizen204 Date: Sat, 27 Jun 2026 18:28:27 +0930 Subject: [PATCH] fix(memory): wrap boto3 clients in context-isolating proxy to silence OTel detach errors When MemoryClient is used from an asyncio task that already has an OTel context attached (e.g. via aws-opentelemetry-distro), the boto3 auto-instrumentation calls opentelemetry.context.attach() then detach() around every API call. If the call crosses an asyncio task or thread-pool boundary, Python raises: ValueError: was created in a different Context because ContextVar.reset() only accepts tokens created in the current execution context. The error is logged at ERROR level by the OTel runtime on every boto3 call, polluting CloudWatch and obscuring real failures. _ContextIsolatingProxy wraps each method call in contextvars.copy_context().run() so the OTel attach/detach pair is fully contained within the copied context snapshot -- reset() always matches the token created in the same context. Applies to both gmdp_client and gmcp_client at construction time, covering every call site in MemoryClient and AgentCoreMemorySessionManager without any per-call changes. Fixes #456 --- src/bedrock_agentcore/memory/client.py | 43 ++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/src/bedrock_agentcore/memory/client.py b/src/bedrock_agentcore/memory/client.py index 421dfcff..51128342 100644 --- a/src/bedrock_agentcore/memory/client.py +++ b/src/bedrock_agentcore/memory/client.py @@ -8,6 +8,7 @@ backward compatibility. """ +import contextvars import copy import logging import time @@ -42,6 +43,40 @@ logger = logging.getLogger(__name__) +class _ContextIsolatingProxy: + """Wraps a boto3 client so that each method call runs inside a context copy. + + When the SDK is used from an asyncio task or a ThreadPoolExecutor thread that + already has an OpenTelemetry context attached, the OTel boto3 auto-instrumentation + (e.g. ``aws-opentelemetry-distro``) calls ``opentelemetry.context.attach()`` and + then ``opentelemetry.context.detach()`` around each API call. Python's + ``ContextVar.reset()`` raises ``ValueError`` if the token was created in a + different execution context than the one calling ``reset()`` — which can happen + when an asyncio task inherits OTel context from a parent task or when a thread + pool worker is used across ``await`` boundaries. + + Running each call inside ``contextvars.copy_context().run()`` creates an + isolated snapshot so the OTel attach/detach pair is always contained within the + same context, preventing the ``ValueError``. + """ + + __slots__ = ("_client", "meta") + + def __init__(self, client: Any) -> None: + object.__setattr__(self, "_client", client) + object.__setattr__(self, "meta", client.meta) + + def __getattr__(self, name: str) -> Any: + attr = getattr(object.__getattribute__(self, "_client"), name) + if callable(attr): + + def _wrapper(*args: Any, **kwargs: Any) -> Any: + return contextvars.copy_context().run(attr, *args, **kwargs) + + return _wrapper + return attr + + class MemoryClient: """High-level Bedrock AgentCore Memory client with essential operations.""" @@ -95,10 +130,12 @@ def __init__( user_agent_extra = build_user_agent_suffix(integration_source) client_config = Config(user_agent_extra=user_agent_extra) - self.gmcp_client = session.client( - "bedrock-agentcore-control", region_name=self.region_name, config=client_config + self.gmcp_client = _ContextIsolatingProxy( + session.client("bedrock-agentcore-control", region_name=self.region_name, config=client_config) + ) + self.gmdp_client = _ContextIsolatingProxy( + session.client("bedrock-agentcore", region_name=self.region_name, config=client_config) ) - self.gmdp_client = session.client("bedrock-agentcore", region_name=self.region_name, config=client_config) logger.info( "Initialized MemoryClient for control plane: %s, data plane: %s",