Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,21 @@
"ExecutionTimeout": 300
},
"path": "./src/plugin/execution_with_otel.py"
},
{
"name": "Otel Logger Example",
"description": "Demonstrates OTel-enriched logging correlated to durable spans",
"handler": "otel_logger_example.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"loggingConfig": {
"ApplicationLogLevel": "INFO",
"LogFormat": "JSON"
},
"path": "./src/otel/otel_logger_example.py"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Demonstrates OTel-enriched logging in a durable execution.

The DurableExecutionOtelPlugin wraps the execution logger (enrich_logger=True
by default) so every log line emitted through context.logger / step_context.logger
is automatically enriched with the active OpenTelemetry trace context
(otel.trace_id, otel.span_id, otel.trace_sampled). This lets logs correlate to
the spans the plugin emits without any user code changes.

Logs emitted:
- at the top level correlate to the invocation span
- inside a step correlate to that step's span
- inside a child context correlate to the child-context span
"""

from typing import Any

from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from aws_durable_execution_sdk_python import StepContext
from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_step,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution


tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# enrich_logger defaults to True, so the execution logger is wrapped with OTel
# trace context injection (otel.trace_id, otel.span_id, otel.trace_sampled).
otel = DurableExecutionOtelPlugin(tracer_provider)


@durable_step
def greet(step_context: StepContext, name: str) -> str:
# Logged inside a step: enriched with this step's span_id.
# Note: avoid reserved LogRecord keys (e.g. "name") in extra.
step_context.logger.info("Greeting inside step", extra={"greeting_name": name})
return f"hello {name}"


@durable_with_child_context
def greet_in_child(child_context: DurableContext, name: str) -> str:
# Logged inside a child context: enriched with the child-context span_id.
child_context.logger.info("Entering child context")
result: str = child_context.step(greet(name), name="child-greet")
child_context.logger.info("Leaving child context", extra={"result": result})
return result


@durable_execution(plugins=[otel])
def handler(_event: Any, context: DurableContext) -> str:
# Logged at the top level: enriched with the invocation span_id.
context.logger.info("Workflow started")

top: str = context.step(greet("world"), name="top-greet")
nested: str = context.run_in_child_context(
greet_in_child("nested"), name="child-context"
)

context.logger.info("Workflow completed", extra={"top": top, "nested": nested})
return f"{top} | {nested}"
18 changes: 18 additions & 0 deletions packages/aws-durable-execution-sdk-python-examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,24 @@
"ExecutionTimeout": 300
}
}
},
"OtelLoggerExample": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "otel_logger_example.handler",
"Description": "Demonstrates OTel-enriched logging correlated to durable spans",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Tests for the OTel-enriched logger example."""

import pytest

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationType
from src.otel import otel_logger_example
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=otel_logger_example.handler,
lambda_function_name="Otel Logger Example",
)
def test_otel_logger_example(durable_runner):
"""Verify the OTel logger example runs and produces the expected result."""
with durable_runner:
result = durable_runner.run(input="{}", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "hello world | hello nested"

# The top-level step is named "top-greet".
top_step = result.get_step("top-greet")
assert deserialize_operation_payload(top_step.result) == "hello world"

# The child context wraps a nested step, so a CONTEXT operation exists.
context_ops = [
op for op in result.operations if op.operation_type is OperationType.CONTEXT
]
assert len(context_ops) >= 1
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
DeterministicIdGenerator,
)
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
from aws_durable_execution_sdk_python_otel.plugin import (
DurableExecutionOtelPlugin,
)
Expand All @@ -19,6 +20,7 @@
"ContextExtractor",
"DeterministicIdGenerator",
"DurableExecutionOtelPlugin",
"OtelEnrichedLogger",
"w3c_client_context_extractor",
"xray_context_extractor",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""OTel-enriched logger for durable executions.

Provides a LoggerInterface wrapper that injects OpenTelemetry trace context
(trace_id, span_id, trace_sampled) into every log message's extra dict. This
enables log-trace correlation in observability backends without changing user code.
"""

from __future__ import annotations

from collections.abc import Mapping
from typing import TYPE_CHECKING

from opentelemetry.trace import TraceFlags


if TYPE_CHECKING:
from aws_durable_execution_sdk_python.types import LoggerInterface

from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin


class OtelEnrichedLogger:
"""LoggerInterface wrapper that injects OTel trace context into log extra fields.

The span context is resolved by the plugin via get_current_span_context(),
which returns the active operation span inside steps and the invocation span
for top-level handler code.

Injected fields:
- otel.trace_id: 32-char hex trace identifier
- otel.span_id: 16-char hex span identifier
- otel.trace_sampled: boolean indicating if the trace is sampled

Args:
inner: The underlying logger to delegate to after enrichment.
plugin: The OTel plugin instance that resolves the current span context.
"""

def __init__(
self, inner: LoggerInterface, plugin: DurableExecutionOtelPlugin
) -> None:
self._inner = inner
self._plugin = plugin

def debug(
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
) -> None:
self._inner.debug(msg, *args, extra=self._enrich(extra))

def info(
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
) -> None:
self._inner.info(msg, *args, extra=self._enrich(extra))

def warning(
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
) -> None:
self._inner.warning(msg, *args, extra=self._enrich(extra))

def error(
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
) -> None:
self._inner.error(msg, *args, extra=self._enrich(extra))

def exception(
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
) -> None:
self._inner.exception(msg, *args, extra=self._enrich(extra))

def _enrich(self, extra: Mapping[str, object] | None) -> dict[str, object]:
"""Inject OTel trace context into the extra dict.

trace_id, span_id, and trace_sampled come from the span context resolved
by the plugin, so the values always match the exported spans.
"""
enriched: dict[str, object] = dict(extra) if extra else {}

span_context = self._plugin.get_current_span_context()
if span_context and span_context.is_valid:
enriched["otel.trace_id"] = format(span_context.trace_id, "032x")
enriched["otel.span_id"] = format(span_context.span_id, "016x")
enriched["otel.trace_sampled"] = bool(
span_context.trace_flags & TraceFlags.SAMPLED
)

return enriched
Loading