Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions src/bedrock_agentcore/memory/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
backward compatibility.
"""

import contextvars
import copy
import logging
import time
Expand Down Expand Up @@ -42,6 +43,40 @@
logger = logging.getLogger(__name__)


class _ContextIsolatingProxy:
"""Wraps a boto3 client so that each method call runs inside a context copy.

When the SDK is used from an asyncio task or a ThreadPoolExecutor thread that
already has an OpenTelemetry context attached, the OTel boto3 auto-instrumentation
(e.g. ``aws-opentelemetry-distro``) calls ``opentelemetry.context.attach()`` and
then ``opentelemetry.context.detach()`` around each API call. Python's
``ContextVar.reset()`` raises ``ValueError`` if the token was created in a
different execution context than the one calling ``reset()`` — which can happen
when an asyncio task inherits OTel context from a parent task or when a thread
pool worker is used across ``await`` boundaries.

Running each call inside ``contextvars.copy_context().run()`` creates an
isolated snapshot so the OTel attach/detach pair is always contained within the
same context, preventing the ``ValueError``.
"""

__slots__ = ("_client", "meta")

def __init__(self, client: Any) -> None:
object.__setattr__(self, "_client", client)
object.__setattr__(self, "meta", client.meta)

def __getattr__(self, name: str) -> Any:
attr = getattr(object.__getattribute__(self, "_client"), name)
if callable(attr):

def _wrapper(*args: Any, **kwargs: Any) -> Any:
return contextvars.copy_context().run(attr, *args, **kwargs)

return _wrapper
return attr


class MemoryClient:
"""High-level Bedrock AgentCore Memory client with essential operations."""

Expand Down Expand Up @@ -95,10 +130,12 @@ def __init__(
user_agent_extra = build_user_agent_suffix(integration_source)
client_config = Config(user_agent_extra=user_agent_extra)

self.gmcp_client = session.client(
"bedrock-agentcore-control", region_name=self.region_name, config=client_config
self.gmcp_client = _ContextIsolatingProxy(
session.client("bedrock-agentcore-control", region_name=self.region_name, config=client_config)
)
self.gmdp_client = _ContextIsolatingProxy(
session.client("bedrock-agentcore", region_name=self.region_name, config=client_config)
)
self.gmdp_client = session.client("bedrock-agentcore", region_name=self.region_name, config=client_config)

logger.info(
"Initialized MemoryClient for control plane: %s, data plane: %s",
Expand Down