From f313191702121c40f10a3f861e303a13572581a9 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Thu, 28 May 2026 15:39:20 -0700 Subject: [PATCH] bugsnag: propagate cloud_provider into contextual logging for queued executions Adds CLOUD_PROVIDER_ANNOTATION_KEY to common_annotations so the annotation key has a single definition in the OSS layer. In the queued-execution processing span, reads the cloud_provider value from task_spec annotations (already in memory) and adds it to the contextual logging context when present. The existing _before_notify hook then includes it automatically in the tangle_context tab on every Bugsnag event raised during that execution's processing. --- .../instrumentation/contextual_logging.py | 24 ++++++++- cloud_pipelines_backend/orchestrator_sql.py | 2 +- .../test_contextual_logging.py | 54 +++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 tests/instrumentation/test_contextual_logging.py diff --git a/cloud_pipelines_backend/instrumentation/contextual_logging.py b/cloud_pipelines_backend/instrumentation/contextual_logging.py index d9414ab1..f95f8087 100644 --- a/cloud_pipelines_backend/instrumentation/contextual_logging.py +++ b/cloud_pipelines_backend/instrumentation/contextual_logging.py @@ -9,6 +9,7 @@ - execution_id: From ExecutionNode.id - tracks individual execution nodes - container_execution_id: From ContainerExecution.id - tracks running containers - user_id: User who initiated the operation +- cloud_provider: From task_spec annotations, set by execution_logging_context for orchestrated runs - Any other metadata you want to track in logs Usage: @@ -23,7 +24,12 @@ import contextvars from contextlib import contextmanager -from typing import Any, Optional +from typing import TYPE_CHECKING, Any, Optional + +if TYPE_CHECKING: + from .. import backend_types_sql as bts + +_CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider" # Single context variable to store all metadata as a dictionary _context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar( @@ -125,3 +131,19 @@ def logging_context(**metadata: Any): finally: # Restore previous metadata _context_metadata.set(prev_metadata) + + +def execution_logging_context(execution: "bts.ExecutionNode"): + """Return a logging context populated with metadata for *execution*. + + Always sets ``execution_id``. Also sets ``cloud_provider`` when the + ``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present + on the task spec. + """ + ctx: dict[str, str] = {"execution_id": execution.id} + cloud_provider = ((execution.task_spec or {}).get("annotations") or {}).get( + _CLOUD_PROVIDER_ANNOTATION_KEY + ) + if cloud_provider is not None: + ctx["cloud_provider"] = cloud_provider + return logging_context(**ctx) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index cabb11a8..1a81f456 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session): self._queued_executions_queue_idle = False start_timestamp = time.monotonic_ns() - with contextual_logging.logging_context(execution_id=queued_execution.id): + with contextual_logging.execution_logging_context(queued_execution): _logger.info("Before processing queued execution") try: self.internal_process_one_queued_execution( diff --git a/tests/instrumentation/test_contextual_logging.py b/tests/instrumentation/test_contextual_logging.py new file mode 100644 index 00000000..b161fce7 --- /dev/null +++ b/tests/instrumentation/test_contextual_logging.py @@ -0,0 +1,54 @@ +"""Tests for contextual_logging.execution_logging_context.""" + +from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend.instrumentation import contextual_logging + +_CLOUD_PROVIDER_KEY = "cloud-pipelines.net/orchestration/cloud_provider" + + +def _make_execution(*, task_spec: dict | None = None) -> bts.ExecutionNode: + node = bts.ExecutionNode(task_spec=task_spec or {}) + node.id = "test-execution-id" + node.extra_data = {} + return node + + +class TestExecutionLoggingContext: + def test_always_sets_execution_id(self): + execution = _make_execution() + with contextual_logging.execution_logging_context(execution): + assert ( + contextual_logging.get_context_metadata("execution_id") + == "test-execution-id" + ) + + def test_no_cloud_provider_when_annotation_absent(self): + execution = _make_execution(task_spec={}) + with contextual_logging.execution_logging_context(execution): + assert contextual_logging.get_context_metadata("cloud_provider") is None + + def test_sets_cloud_provider_from_annotation(self): + execution = _make_execution( + task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "nebius"}} + ) + with contextual_logging.execution_logging_context(execution): + assert contextual_logging.get_context_metadata("cloud_provider") == "nebius" + + def test_no_cloud_provider_when_task_spec_is_none(self): + execution = _make_execution(task_spec=None) + with contextual_logging.execution_logging_context(execution): + assert contextual_logging.get_context_metadata("cloud_provider") is None + + def test_no_cloud_provider_when_annotations_is_none(self): + execution = _make_execution(task_spec={"annotations": None}) + with contextual_logging.execution_logging_context(execution): + assert contextual_logging.get_context_metadata("cloud_provider") is None + + def test_context_is_cleared_after_block(self): + execution = _make_execution( + task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "gcp"}} + ) + with contextual_logging.execution_logging_context(execution): + pass + assert contextual_logging.get_context_metadata("execution_id") is None + assert contextual_logging.get_context_metadata("cloud_provider") is None