Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/src/imednet/core/http/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
wait_random_exponential,
)

from imednet.core.http.circuit_breaker import CircuitBreakerError, get_global_circuit_breaker
from imednet.core.http.handlers import handle_response
from imednet.core.http.monitor import RequestMonitor
from imednet.core.operations.circuit_breaker import CircuitBreakerError, get_global_circuit_breaker
from imednet.core.retry import DefaultRetryPolicy, RetryPolicy, RetryState

_SUPPRESSED_LOG_LEVEL = logging.CRITICAL + 1
Expand Down
Empty file.
149 changes: 149 additions & 0 deletions packages/core/src/imednet/core/operations/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Universal execution wrapper that applies exponential backoff retries and circuit breaking
to any compliant operation.
"""

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, TypeVar

from tenacity import (
AsyncRetrying,
RetryCallState,
RetryError,
Retrying,
stop_after_attempt,
wait_random_exponential,
)

from imednet.core.operations.circuit_breaker import get_global_circuit_breaker
from imednet.core.operations.monitor import OperationMonitor

if TYPE_CHECKING:
from opentelemetry.trace import Tracer
else:
Tracer = Any

logger = logging.getLogger(__name__)

T = TypeVar("T")


class OperationRetryPolicy(ABC):
@abstractmethod
def should_retry(self, exception: Exception) -> bool:
"""Return True if the exception should trigger a retry."""
pass


class DefaultOperationRetryPolicy(OperationRetryPolicy):
def should_retry(self, exception: Exception) -> bool:
# Default fallback: retry on any exception?
# Usually we only retry on specific ones, but for universal wrapper, we can allow everything or leave it configurable
return True


class UniversalExecutor:
"""Execute arbitrary operations with retry, circuit breaking, and telemetry."""

def __init__(
self,
retries: int,
backoff_factor: float,
tracer: Optional[Tracer] = None,
retry_policy: Optional[OperationRetryPolicy] = None,
operation_name: str = "operation",
wait_strategy: Optional[Callable[[RetryCallState], float]] = None,
retry_predicate: Optional[Callable[[RetryCallState], bool]] = None,
**attributes: Any,
) -> None:
self.retries = retries
self.backoff_factor = backoff_factor
self.tracer = tracer
self.retry_policy = retry_policy or DefaultOperationRetryPolicy()
self.operation_name = operation_name
self.attributes = attributes
self._jitter_wait = wait_random_exponential(multiplier=self.backoff_factor)
self.wait_strategy = wait_strategy or (lambda rs: float(self._jitter_wait(rs)))
self.retry_predicate = retry_predicate or self._should_retry_wrapper

def _should_retry_wrapper(self, retry_state: RetryCallState) -> bool:
if retry_state.outcome and retry_state.outcome.failed:
exc = retry_state.outcome.exception()
if isinstance(exc, Exception):
return self.retry_policy.should_retry(exc)
return False

def execute(self, func: Callable[[], T]) -> T:
"""Synchronous execution."""
get_global_circuit_breaker().check_request_allowed()

retryer = Retrying(
stop=stop_after_attempt(self.retries + 1),
wait=self.wait_strategy,
retry=self.retry_predicate,
reraise=False,
)

with OperationMonitor(self.tracer, self.operation_name, **self.attributes) as monitor:
try:
result: Any = retryer(func)
get_global_circuit_breaker().record_success()
monitor.on_success()
return result
except RetryError as e:
get_global_circuit_breaker().record_failure()
cause = e.last_attempt.exception() if e.last_attempt else e
if isinstance(cause, Exception):
try:
monitor.on_retry_error(cause, self.retries)
except Exception as _exc:
if _exc is not cause:
raise
if cause is not None and cause is not e:
raise cause
raise
except Exception as e:
get_global_circuit_breaker().record_failure()
monitor.on_failure(e)
raise

async def execute_async(self, func: Callable[[], Awaitable[T]]) -> T:
"""Asynchronous execution."""
get_global_circuit_breaker().check_request_allowed()

retryer = AsyncRetrying(
stop=stop_after_attempt(self.retries + 1),
wait=self.wait_strategy,
retry=self.retry_predicate,
reraise=False,
)

async with OperationMonitor(self.tracer, self.operation_name, **self.attributes) as monitor:
try:

async def _async_wrapper() -> T:
return await func()

result: Any = await retryer(_async_wrapper)
get_global_circuit_breaker().record_success()
monitor.on_success()
return result
except RetryError as e:
get_global_circuit_breaker().record_failure()
cause = e.last_attempt.exception() if e.last_attempt else e
if isinstance(cause, Exception):
try:
monitor.on_retry_error(cause, self.retries)
except Exception as _exc:
if _exc is not cause:
raise
if cause is not None and cause is not e:
raise cause
raise
except Exception as e:
get_global_circuit_breaker().record_failure()
monitor.on_failure(e)
raise
85 changes: 85 additions & 0 deletions packages/core/src/imednet/core/operations/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

import logging
import time
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, NoReturn, Optional

if TYPE_CHECKING:
from opentelemetry.trace import Tracer
else:
Tracer = Any

logger = logging.getLogger(__name__)


class OperationMonitor:
"""Helper to handle generic operation monitoring (tracing, timing, logging)."""

def __init__(self, tracer: Optional[Tracer], operation_name: str, **attributes: Any) -> None:
self.tracer = tracer
self.operation_name = operation_name
self.attributes = attributes
self.start_time: float = 0.0
self.span: Any = None
self._cm: Any = None

def _create_cm(self) -> Any:
if self.tracer:
return self.tracer.start_as_current_span(
self.operation_name,
attributes=self.attributes,
)
return nullcontext()

def __enter__(self) -> "OperationMonitor":
self._cm = self._create_cm()
self.span = self._cm.__enter__()
self.start_time = time.monotonic()
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._cm:
self._cm.__exit__(exc_type, exc_val, exc_tb)

async def __aenter__(self) -> "OperationMonitor":
self._cm = self._create_cm()
# Handle async context managers if the tracer supports them
if hasattr(self._cm, "__aenter__"):
self.span = await self._cm.__aenter__()
else:
self.span = self._cm.__enter__()
self.start_time = time.monotonic()
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._cm:
if hasattr(self._cm, "__aexit__"):
await self._cm.__aexit__(exc_type, exc_val, exc_tb)
else:
self._cm.__exit__(exc_type, exc_val, exc_tb)

def on_success(self, **extra_attributes: Any) -> None:
latency = time.monotonic() - self.start_time
logger.info(
f"{self.operation_name} succeeded",
extra={**self.attributes, **extra_attributes, "latency": latency},
)
if self.span:
for k, v in extra_attributes.items():
self.span.set_attribute(k, v)
self.span.set_attribute("status", "success")

def on_retry_error(self, cause: Exception, retries: int) -> NoReturn:
logger.error(
f"{self.operation_name} failed after retries",
extra={**self.attributes, "retries": retries},
)
if self.span:
self.span.set_attribute("status", "failed")
self.span.set_attribute("retries", retries)
raise cause

def on_failure(self, cause: Exception) -> None:
if self.span:
self.span.set_attribute("status", "failed")
17 changes: 17 additions & 0 deletions packages/core/src/imednet/core/operations/protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Any, Awaitable, Protocol, TypeVar, runtime_checkable

T = TypeVar("T", covariant=True)


@runtime_checkable
class OperationProtocol(Protocol[T]):
"""Protocol for synchronous operations."""

def execute(self, *args: Any, **kwargs: Any) -> T: ...


@runtime_checkable
class AsyncOperationProtocol(Protocol[T]):
"""Protocol for asynchronous operations."""

def execute(self, *args: Any, **kwargs: Any) -> Awaitable[T]: ...
1 change: 1 addition & 0 deletions packages/core/src/imednet/integrations/sink_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class SinkConfig:
extra: dict[str, Any] = field(default_factory=dict)
quality_gate_enabled: bool = False
min_schema_readiness_score: float = 100.0
tracer: Optional[Any] = field(default=None, repr=False)


# ---------------------------------------------------------------------------
Expand Down
84 changes: 35 additions & 49 deletions packages/plugins-sinks/src/imednet_sinks/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,63 +179,49 @@ def _connect(self) -> None:
# ------------------------------------------------------------------

def write_batch(self, records: Sequence[Any], *, batch_id: str) -> int:
"""Write *records* to MongoDB using upsert (idempotent) or insert.

Parameters
----------
records:
Sequence of typed ``Record`` model instances.
batch_id:
Idempotency key (e.g. ``"MYSTUDY/FORM1/0"``).

Returns
-------
int
Number of records written (upserted or inserted).
"""
"""Write *records* to MongoDB using upsert (idempotent) or insert."""
docs = [_record_to_document(r, self._study_key) for r in records]
if not docs:
return 0

last_exc: Optional[Exception] = None
for attempt in range(self.config.max_retries + 1):
try:
if self.config.idempotent:
pymongo = _require_optional_dep("pymongo", "mongodb")
ops = [
pymongo.UpdateOne(
{"_id": doc["_id"]},
{"$set": doc},
upsert=True,
)
for doc in docs
]
result = self._collection.bulk_write(ops, ordered=False)
written = len(docs)
else:
result = self._collection.insert_many(docs, ordered=False)
written = len(result.inserted_ids)

logger.debug("Wrote batch %s (%d records)", batch_id, written)
return written
except Exception as exc: # noqa: BLE001
last_exc = exc
if attempt < self.config.max_retries:
delay = self.config.retry_backoff * (2**attempt)
logger.warning(
"Batch %s attempt %d failed (%s); retrying in %.1fs",
batch_id,
attempt + 1,
exc,
delay,
)
time.sleep(delay)
from imednet.core.operations.executor import UniversalExecutor

raise ExportBatchError(
f"Batch {batch_id!r} failed after {self.config.max_retries + 1} attempts: {last_exc}",
def execute_export() -> int:
if self.config.idempotent:
pymongo = _require_optional_dep("pymongo", "mongodb")
ops = [
pymongo.UpdateOne(
{"_id": doc["_id"]},
{"$set": doc},
upsert=True,
)
for doc in docs
]
self._collection.bulk_write(ops, ordered=False)
written = len(docs)
else:
result = self._collection.insert_many(docs, ordered=False)
written = len(result.inserted_ids)

logger.debug("Wrote batch %s (%d records)", batch_id, written)
return written

executor = UniversalExecutor(
retries=self.config.max_retries,
backoff_factor=self.config.retry_backoff,
tracer=self.config.tracer,
operation_name="export_mongodb",
batch_id=batch_id,
)

try:
return executor.execute(execute_export)
except Exception as exc:
raise ExportBatchError(
f"Batch {batch_id!r} failed after {self.config.max_retries + 1} attempts: {exc}",
batch_id=batch_id,
) from exc

def flush(self) -> None:
"""No-op: MongoDB writes are committed per bulk operation."""

Expand Down
Loading
Loading