Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cloud_pipelines_backend/instrumentation/contextual_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from contextlib import contextmanager
from typing import Any, Optional

from .. import backend_types_sql as bts

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This is an AI-generated code review comment.

New instrumentation → core import is a circular-import hazard.

from .. import backend_types_sql as bts

The instrumentation package was previously dependency-free w.r.t. core model code — bugsnag_instrumentation.py, error_normalization.py, and the rest don't import from ... This is the first instrumentation → core import. If backend_types_sql ever wants to call back into instrumentation for logging (a plausible future change — e.g. logging inside an __post_init__ or validator), you get an import cycle.

The only runtime use of bts here is the type annotation on execution: bts.ExecutionNode. That doesn't need a real import.

Fix: guard with TYPE_CHECKING:

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from .. import backend_types_sql as bts

bts.ExecutionNode becomes a forward reference resolved by the type checker, with zero runtime cost and no cycle risk.


_CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This is an AI-generated code review comment.

Module docstring's metadata-key list should mention cloud_provider.

After this PR, cloud_provider starts appearing in logs for queued executions. The module docstring at the top of this file enumerates the common metadata keys (request_id, pipeline_run_id, execution_id, etc.) — but cloud_provider isn't listed. People grepping the docstring to learn "what metadata keys exist?" won't find it.

Fix: add one line to the docstring's "Common metadata keys" list:

- cloud_provider: From task_spec annotations, set by execution_logging_context for orchestrated runs.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nebius_launchers.py has this same exact constant. Would it make sense to create a shared config/model/etc file that both of them reference against? Or some way to not duplicate the variable?


# Single context variable to store all metadata as a dictionary
_context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
"context_metadata", default={}
Expand Down Expand Up @@ -125,3 +129,21 @@ def logging_context(**metadata: Any):
finally:
# Restore previous metadata
_context_metadata.set(prev_metadata)


def execution_logging_context(execution: bts.ExecutionNode):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This is an AI-generated code review comment.

No unit tests for execution_logging_context.

The new function has several non-trivial branches: task_spec is None, task_spec without annotations, annotations without the cloud-provider key, the key set to a string, the key set to None, the key with an empty string. None of these is pinned by a test, and the orchestrator call site relies on the helper being correct.

Fix: add a handful of cases in tests/instrumentation/test_contextual_logging.py:

def test_execution_logging_context_no_annotations():
    exc = make_execution_node(id="e1", task_spec={})
    with contextual_logging.execution_logging_context(exc):
        assert contextual_logging.get_context_metadata("execution_id") == "e1"
        assert contextual_logging.get_context_metadata("cloud_provider") is None

def test_execution_logging_context_with_cloud_provider():
    exc = make_execution_node(
        id="e1",
        task_spec={"annotations": {"cloud-pipelines.net/orchestration/cloud_provider": "gcp"}},
    )
    with contextual_logging.execution_logging_context(exc):
        assert contextual_logging.get_context_metadata("cloud_provider") == "gcp"

def test_execution_logging_context_annotations_is_none():
    # Regression guard for None handling (see other comment).
    exc = make_execution_node(id="e1", task_spec={"annotations": None})
    with contextual_logging.execution_logging_context(exc):
        assert contextual_logging.get_context_metadata("cloud_provider") is None

"""Return a logging context populated with metadata for *execution*.

Always sets ``execution_id``. Also sets ``cloud_provider`` when the
``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present
on the task spec.
"""
ctx: dict[str, str] = {"execution_id": execution.id}
cloud_provider = (
(execution.task_spec or {})
.get("annotations", {})

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This is an AI-generated code review comment.

annotations: None (key present, value None) makes the chained .get crash.

cloud_provider = (
    (execution.task_spec or {})
    .get("annotations", {})           # returns None, NOT {}, if annotations is None
    .get(_CLOUD_PROVIDER_ANNOTATION_KEY)
)

.get("annotations", {}) only uses the default when the key is missing. If task_spec = {"annotations": None} (key present, value None — common in JSON-roundtripped data where keys are emitted even when unset), .get("annotations", {}) returns None and the trailing .get(...) raises AttributeError: 'NoneType' object has no attribute 'get'. The crash propagates out of execution_logging_context, breaking the with block setup at the call site and silently degrading log correlation for that execution.

Fix: chain or {} after the inner .get too:

cloud_provider = (
    ((execution.task_spec or {}).get("annotations") or {})
    .get(_CLOUD_PROVIDER_ANNOTATION_KEY)
)

.get(_CLOUD_PROVIDER_ANNOTATION_KEY)
)
if cloud_provider is not None:
ctx["cloud_provider"] = cloud_provider
return logging_context(**ctx)
2 changes: 1 addition & 1 deletion cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
self._queued_executions_queue_idle = False
start_timestamp = time.monotonic_ns()

with contextual_logging.logging_context(execution_id=queued_execution.id):
with contextual_logging.execution_logging_context(queued_execution):
_logger.info("Before processing queued execution")
try:
self.internal_process_one_queued_execution(
Expand Down