diff --git a/CLAUDE.md b/CLAUDE.md index 53d09c7e9..36f520ccd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -20,8 +20,10 @@ fastapi run --reload app/main.py # Run pre-commit hooks uv run pre-commit run --all-files -# Generate database migration (rev-id should be latest existing revision ID + 1) -alembic revision --autogenerate -m "Description" --rev-id 040 +# Generate database migration. +# Compute at runtime as the latest existing revision ID + 1, +# zero-padded to 3 digits (check the highest NNN in app/alembic/versions/NNN_*.py). +alembic revision --autogenerate -m "Description" --rev-id # Seed database with test data uv run python -m app.seed_data.seed_data diff --git a/backend/app/alembic/versions/064_add_run_mode_to_evaluation_run.py b/backend/app/alembic/versions/064_add_run_mode_to_evaluation_run.py new file mode 100644 index 000000000..d7c5beb23 --- /dev/null +++ b/backend/app/alembic/versions/064_add_run_mode_to_evaluation_run.py @@ -0,0 +1,85 @@ +"""add run_mode column and unique run-name constraint to evaluation_run + +Revision ID: 064 +Revises: 063 +Create Date: 2026-05-20 00:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "064" +down_revision = "063" +branch_labels = None +depends_on = None + +disable_per_migration_transaction = True + +_UNIQUE_INDEX = "uq_evaluation_run_org_project_run_name" +_UNIQUE_CONSTRAINT = "uq_evaluation_run_org_project_run_name" + + +def upgrade(): + # 1. Add run_mode as nullable first so existing rows are backfilled by the + # server default, then tighten to NOT NULL. The server default is left + # in place as a safety net. + with op.get_context().autocommit_block(): + op.add_column( + "evaluation_run", + sa.Column( + "run_mode", + sa.String(length=10), + nullable=True, + server_default=sa.text("'batch'"), + comment="Execution mode: batch or fast", + ), + ) + op.execute("ALTER TABLE evaluation_run ALTER COLUMN run_mode SET NOT NULL") + + # 2. Resolve duplicate (organization_id, project_id, run_name) tuples + # non-destructively before adding the unique constraint. Keep the + # lowest-id row's run_name untouched and rename the rest by appending a + # unique "__dup_" suffix so no historical run (and its scores, result + # URLs, or batch_job links) is lost. + with op.get_context().autocommit_block(): + op.execute( + """ + UPDATE evaluation_run e + SET run_name = e.run_name || '__dup_' || e.id + WHERE e.id <> ( + SELECT MIN(x.id) + FROM evaluation_run x + WHERE x.organization_id = e.organization_id + AND x.project_id = e.project_id + AND x.run_name = e.run_name + ) + """ + ) + + # 3. Build the unique index CONCURRENTLY so the scan does not take an + # AccessExclusiveLock, then attach it as a named constraint via + # ADD CONSTRAINT ... USING INDEX (brief catalog-only lock). + with op.get_context().autocommit_block(): + op.execute( + f"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS " + f'"{_UNIQUE_INDEX}" ' + f"ON evaluation_run (organization_id, project_id, run_name)" + ) + op.execute( + f"ALTER TABLE evaluation_run " + f'ADD CONSTRAINT "{_UNIQUE_CONSTRAINT}" ' + f'UNIQUE USING INDEX "{_UNIQUE_INDEX}"' + ) + + +def downgrade(): + # Reverse in opposite order to upgrade(). + op.execute( + f"ALTER TABLE evaluation_run " + f'DROP CONSTRAINT IF EXISTS "{_UNIQUE_CONSTRAINT}"' + ) + with op.get_context().autocommit_block(): + op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{_UNIQUE_INDEX}"') + op.drop_column("evaluation_run", "run_mode") diff --git a/backend/app/api/docs/evaluation/create_evaluation.md b/backend/app/api/docs/evaluation/create_evaluation.md index a719579b2..3e9d7b636 100644 --- a/backend/app/api/docs/evaluation/create_evaluation.md +++ b/backend/app/api/docs/evaluation/create_evaluation.md @@ -1,16 +1,22 @@ -Start an evaluation run using the OpenAI Batch API. +Start an evaluation run against a stored dataset. -Evaluations allow you to systematically test LLM configurations against -predefined datasets with automatic progress tracking and result collection. +Two execution modes are supported via the optional `run_mode` field: + +* `batch` (default) — submits the work to the OpenAI Batch API. Cost-efficient + for large datasets; turnaround can take up to 24 hours. +* `fast` — runs the evaluation synchronously through the OpenAI Responses API + and returns results within seconds-to-minutes. Restricted to text + evaluations on datasets with at most `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows. **Key Features:** -* Fetches dataset items from Langfuse and creates a batch processing job via the OpenAI Batch API -* Asynchronous processing with automatic progress tracking (checks every 60s) +* Fetches dataset items from Langfuse and creates a job (batch or fast) * Uses a stored config (created via `/configs`) to define the provider parameters -* Stores results for comparison and analysis -* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results +* Same scoring semantics across both modes — cosine similarity, Langfuse traces, + and optional LLM-as-Judge correctness +* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results; + the response carries `run_mode` so clients can tell the two paths apart -## Example +## Example (batch — default) ```json { @@ -20,3 +26,32 @@ predefined datasets with automatic progress tracking and result collection. "config_version": 1 } ``` + +## Example (fast) + +```json +{ + "dataset_id": 123, + "experiment_name": "may19-temp0.2-gpt4o-fast", + "config_id": "f54f0d67-4817-4103-9fdf-b74b3d46733e", + "config_version": 1, + "run_mode": "fast" +} +``` + +## Fast-mode error responses + +These apply only when `run_mode` is `fast`. + +| Status | Code | When | +| --- | --- | --- | +| 422 | `config_type_unsupported` | Resolved config is not a text-evaluation config | +| 422 | `dataset_too_large_for_fast` | Dataset exceeds `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows | + +## General error responses + +These apply to both `batch` and `fast` modes. + +| Status | Code | When | +| --- | --- | --- | +| 409 | `run_name_already_exists` | A run with the same `experiment_name` already exists for this (organization, project) | diff --git a/backend/app/api/docs/evaluation/list_datasets.md b/backend/app/api/docs/evaluation/list_datasets.md index e315db1d0..36b7cc1c4 100644 --- a/backend/app/api/docs/evaluation/list_datasets.md +++ b/backend/app/api/docs/evaluation/list_datasets.md @@ -1,3 +1,10 @@ List all datasets for the current organization and project. -Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, and object store URL. +Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, object store URL, and an `eligible_for_fast` flag that is `true` when the dataset's unique-row count is within `EVAL_FAST_MAX_UNIQUE_ROWS` (and so can be used with `run_mode="fast"` on `POST /evaluations`). + +## Query parameters + +| Parameter | Description | +| --- | --- | +| `limit` / `offset` | Pagination (default 50 / 0; max limit 100) | +| `eligible_for` | If set to `fast`, the response is filtered to only datasets where `eligible_for_fast` is `true` | diff --git a/backend/app/api/routes/evaluations/dataset.py b/backend/app/api/routes/evaluations/dataset.py index 202774da2..cd04ec32b 100644 --- a/backend/app/api/routes/evaluations/dataset.py +++ b/backend/app/api/routes/evaluations/dataset.py @@ -17,14 +17,19 @@ from app.core.cloud import get_cloud_storage from app.crud.evaluations import ( get_dataset_by_id, +) +from app.crud.evaluations import ( list_datasets as list_evaluation_datasets, ) from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud from app.models.evaluation import DatasetUploadResponse, EvaluationDataset from app.services.evaluations import ( - upload_dataset as upload_evaluation_dataset, + is_dataset_fast_eligible, validate_csv_file, ) +from app.services.evaluations import ( + upload_dataset as upload_evaluation_dataset, +) from app.utils import ( APIResponse, load_description, @@ -39,13 +44,15 @@ def _dataset_to_response( dataset: EvaluationDataset, signed_url: str | None = None ) -> DatasetUploadResponse: """Convert a dataset model to a DatasetUploadResponse.""" + original_items = dataset.dataset_metadata.get("original_items_count", 0) return DatasetUploadResponse( dataset_id=dataset.id, dataset_name=dataset.name, description=dataset.description, total_items=dataset.dataset_metadata.get("total_items_count", 0), - original_items=dataset.dataset_metadata.get("original_items_count", 0), + original_items=original_items, duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1), + eligible_for_fast=is_dataset_fast_eligible(original_items_count=original_items), langfuse_dataset_id=dataset.langfuse_dataset_id, object_store_url=dataset.object_store_url, signed_url=signed_url, @@ -104,6 +111,15 @@ def list_datasets( default=50, ge=1, le=100, description="Maximum number of datasets to return" ), offset: int = Query(default=0, ge=0, description="Number of datasets to skip"), + eligible_for: str + | None = Query( + default=None, + description=( + "If 'fast', return only datasets eligible for run_mode='fast' " + "(unique-row count within EVAL_FAST_MAX_UNIQUE_ROWS)." + ), + enum=["fast"], + ), ) -> APIResponse[list[DatasetUploadResponse]]: """List evaluation datasets.""" datasets = list_evaluation_datasets( @@ -112,11 +128,12 @@ def list_datasets( project_id=auth_context.project_.id, limit=limit, offset=offset, + eligible_for_fast=(eligible_for == "fast"), ) - return APIResponse.success_response( - data=[_dataset_to_response(dataset) for dataset in datasets] - ) + dataset_responses = [_dataset_to_response(dataset) for dataset in datasets] + + return APIResponse.success_response(data=dataset_responses) @router.get( diff --git a/backend/app/api/routes/evaluations/evaluation.py b/backend/app/api/routes/evaluations/evaluation.py index 591f5d985..0c9c941f9 100644 --- a/backend/app/api/routes/evaluations/evaluation.py +++ b/backend/app/api/routes/evaluations/evaluation.py @@ -3,6 +3,7 @@ import logging from uuid import UUID +from asgi_correlation_id import correlation_id from fastapi import ( APIRouter, Body, @@ -12,13 +13,14 @@ ) from app.api.deps import AuthContextDep, SessionDep +from app.api.permissions import Permission, require_permission from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud from app.crud.evaluations.core import group_traces_by_question_id -from app.models.evaluation import EvaluationRunPublic -from app.api.permissions import Permission, require_permission +from app.models.evaluation import EvaluationRunPublic, RunModeEnum from app.services.evaluations import ( get_evaluation_with_scores, start_evaluation, + validate_and_start_fast_evaluation, ) from app.utils import ( APIResponse, @@ -45,8 +47,32 @@ def evaluate( ), config_id: UUID = Body(..., description="Stored config ID"), config_version: int = Body(..., ge=1, description="Stored config version"), + run_mode: RunModeEnum = Body( + default=RunModeEnum.BATCH, + description="Execution mode: 'batch' (default) or 'fast'", + ), ) -> APIResponse[EvaluationRunPublic]: """Start an evaluation run.""" + logger.info( + f"[evaluate] Starting evaluation | run_mode={run_mode.value} | " + f"experiment_name={experiment_name} | dataset_id={dataset_id} | " + f"org_id={auth_context.organization_.id} | " + f"project_id={auth_context.project_.id}" + ) + + if run_mode == RunModeEnum.FAST: + eval_run = validate_and_start_fast_evaluation( + session=session, + dataset_id=dataset_id, + run_name=experiment_name, + config_id=config_id, + config_version=config_version, + organization_id=auth_context.organization_.id, + project_id=auth_context.project_.id, + trace_id=correlation_id.get() or "N/A", + ) + return APIResponse.success_response(data=eval_run) + eval_run = start_evaluation( session=session, dataset_id=dataset_id, diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index b78e7b15f..a4031b202 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -164,6 +164,7 @@ def initialize_worker(**_) -> None: include=[ "app.celery.tasks.job_execution", "app.celery.tasks.notifications", + "app.celery.tasks.evaluation_fast", ], ) @@ -186,6 +187,12 @@ def initialize_worker(**_) -> None: routing_key="low", queue_arguments={"x-max-priority": 1}, ), + Queue( + "evaluations", + exchange=default_exchange, + routing_key="evaluations", + queue_arguments={"x-max-priority": 6}, + ), Queue("cron", exchange=default_exchange, routing_key="cron"), Queue("default", exchange=default_exchange, routing_key="default"), ), diff --git a/backend/app/celery/tasks/evaluation_fast.py b/backend/app/celery/tasks/evaluation_fast.py new file mode 100644 index 000000000..e9b1a626e --- /dev/null +++ b/backend/app/celery/tasks/evaluation_fast.py @@ -0,0 +1,56 @@ +""" +Celery task for the synchronous (fast) text-evaluation pipeline. + +This module hosts the single orchestrator task per fast evaluation run. The +heavy lifting lives in `app/services/evaluations/fast.py`; this task is a thin +shim that sets the correlation id, attaches the OTel parent context, and +delegates. + +See `Fast Evaluation SRD.md` for the design (queue, retries, idempotency). +""" + +import logging + +from celery import Task, current_task + +from app.celery.celery_app import celery_app +from app.celery.tasks.job_execution import _run_with_otel_parent, _set_trace +from app.celery.utils import gevent_timeout +from app.core.config import settings + +logger = logging.getLogger(__name__) + +# Sentinel correlation id used when no trace id is propagated from the +# enqueueing request. Matches the codebase-wide "N/A" default (see +# app/core/logger.py and app/celery/utils.py). +DEFAULT_TRACE_ID = "N/A" + + +@celery_app.task(bind=True, queue="evaluations", priority=6) +@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_evaluation_fast") +def run_evaluation_fast( + self: Task, eval_run_id: int, trace_id: str = DEFAULT_TRACE_ID +) -> None: + """Run the fast evaluation pipeline for one EvaluationRun. + + Idempotency: each stage is skipped on retry when its `batch_job` marker is + already set on the EvaluationRun, so Celery redelivery never re-calls + OpenAI for work that already succeeded. + + Args: + eval_run_id: ID of the EvaluationRun (run_mode="fast"). + trace_id: Correlation id from the enqueueing request, propagated into + the worker for log correlation. + """ + from app.services.evaluations.fast import execute_fast_evaluation + + _set_trace(trace_id) + logger.info( + f"[run_evaluation_fast] Starting fast evaluation task | " + f"eval_run_id={eval_run_id} | task_id={current_task.request.id}" + ) + + return _run_with_otel_parent( + self, + lambda: execute_fast_evaluation(eval_run_id=eval_run_id), + ) diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index 288cba7c4..f643c7df6 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -198,6 +198,22 @@ def start_tts_result_processing( return task_id +def start_fast_evaluation(eval_run_id: int, trace_id: str = "N/A") -> str: + """Enqueue the run_evaluation_fast orchestrator task for one EvaluationRun.""" + from app.celery.tasks.evaluation_fast import run_evaluation_fast + + task_id = _enqueue_with_trace_context( + run_evaluation_fast, + eval_run_id=eval_run_id, + trace_id=trace_id, + ) + logger.info( + f"[start_fast_evaluation] Enqueued fast eval | " + f"eval_run_id={eval_run_id} | task_id={task_id}" + ) + return task_id + + def get_task_status(task_id: str) -> Dict[str, Any]: result = AsyncResult(task_id) return { diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 3088d8002..c14aec846 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -182,6 +182,12 @@ def AWS_S3_BUCKET(self) -> str: DOC_TRANSFORMATION_PENDING_THRESHOLD_MINUTES: int = 30 PENDING_JOB_QUERY_TIMEOUT_MS: int = 1000 + # Fast evaluation (run_mode="fast") configuration. + # See "Fast Evaluation SRD.md" for the full design rationale. + EVAL_FAST_MAX_UNIQUE_ROWS: int = 10 + EVAL_FAST_FAILURE_THRESHOLD: float = 0.5 + EVAL_FAST_API_CONCURRENCY: int = 10 + @computed_field # type: ignore[prop-decorator] @property def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int: diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index a5824c0a2..2c0bd8f48 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -25,6 +25,11 @@ calculate_cosine_similarity, start_embedding_batch, ) +from app.crud.evaluations.fast import ( + JOB_TYPE_EMBEDDING_FAST, + JOB_TYPE_EVALUATION_FAST, + run_fast_evaluation, +) from app.crud.evaluations.langfuse import ( create_langfuse_dataset_run, fetch_trace_scores_from_langfuse, @@ -65,6 +70,10 @@ "upload_csv_to_object_store", # Batch "start_evaluation_batch", + # Fast eval + "JOB_TYPE_EMBEDDING_FAST", + "JOB_TYPE_EVALUATION_FAST", + "run_fast_evaluation", # Processing "check_and_process_evaluation", "poll_all_pending_evaluations", diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index f3d51c56d..53b17dc3d 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -13,6 +13,7 @@ from app.crud.evaluations.langfuse import fetch_trace_scores_from_langfuse from app.crud.evaluations.score import EvaluationScore from app.models import EvaluationRun, EvaluationRunUpdate +from app.models.evaluation import RunModeEnum from app.models.config.config import ConfigTag from app.models.llm.request import ConfigBlob, LLMCallConfig from app.models.stt_evaluation import EvaluationType @@ -62,6 +63,7 @@ def create_evaluation_run( config_version: int, organization_id: int, project_id: int, + run_mode: RunModeEnum | str = RunModeEnum.BATCH, ) -> EvaluationRun: """ Create a new evaluation run record in the database. @@ -75,10 +77,18 @@ def create_evaluation_run( config_version: Version number of the config organization_id: Organization ID project_id: Project ID + run_mode: Execution mode (RunModeEnum.BATCH default, or RunModeEnum.FAST) Returns: The created EvaluationRun instance + + Raises: + ValueError: If run_mode is not a valid RunModeEnum value. """ + # Validate against the allowed run modes before hitting the DB CHECK + # constraint, so callers get a clear error instead of an IntegrityError. + run_mode_value = RunModeEnum(run_mode).value + eval_run = EvaluationRun( run_name=run_name, dataset_name=dataset_name, @@ -87,6 +97,7 @@ def create_evaluation_run( config_id=config_id, config_version=config_version, status="pending", + run_mode=run_mode_value, organization_id=organization_id, project_id=project_id, inserted_at=now(), diff --git a/backend/app/crud/evaluations/dataset.py b/backend/app/crud/evaluations/dataset.py index 0aaf5a2e9..d0cf29930 100644 --- a/backend/app/crud/evaluations/dataset.py +++ b/backend/app/crud/evaluations/dataset.py @@ -12,10 +12,12 @@ from typing import Any from fastapi import HTTPException +from sqlalchemy import Integer, cast from sqlalchemy.exc import IntegrityError from sqlmodel import Session, select from app.core.cloud.storage import CloudStorage +from app.core.config import settings from app.core.storage_utils import ( generate_timestamped_filename, upload_to_object_store, @@ -179,6 +181,7 @@ def list_datasets( project_id: int, limit: int = 50, offset: int = 0, + eligible_for_fast: bool = False, ) -> list[EvaluationDataset]: """ List all evaluation datasets for an organization and project with pagination. @@ -189,6 +192,11 @@ def list_datasets( project_id: Project ID limit: Maximum number of datasets to return (default 50) offset: Number of datasets to skip (for pagination) + eligible_for_fast: When True, restrict to datasets whose unique-row count + (dataset_metadata.original_items_count) is at or below + settings.EVAL_FAST_MAX_UNIQUE_ROWS. Applied in SQL so pagination + counts only eligible rows. Datasets missing the metadata key are + excluded (NULL fails the comparison). Returns: List of EvaluationDataset objects, ordered by most recent first @@ -198,7 +206,19 @@ def list_datasets( .where(EvaluationDataset.organization_id == organization_id) .where(EvaluationDataset.project_id == project_id) .where(EvaluationDataset.type == EvaluationType.TEXT.value) - .order_by(EvaluationDataset.inserted_at.desc()) + ) + + if eligible_for_fast: + original_items_count = cast( + EvaluationDataset.dataset_metadata["original_items_count"].astext, + Integer, + ) + statement = statement.where( + original_items_count <= settings.EVAL_FAST_MAX_UNIQUE_ROWS + ) + + statement = ( + statement.order_by(EvaluationDataset.inserted_at.desc()) .limit(limit) .offset(offset) ) diff --git a/backend/app/crud/evaluations/fast.py b/backend/app/crud/evaluations/fast.py new file mode 100644 index 000000000..c58ca8970 --- /dev/null +++ b/backend/app/crud/evaluations/fast.py @@ -0,0 +1,858 @@ +"""Fast evaluation orchestration (run_mode="fast"). + +Synchronous text-eval path: makes Responses + Embeddings calls in parallel from +a single Celery task and persists per-stage units to S3. Each stage is skipped +on retry if its `batch_job` row already exists. + + Stage 1 — Responses unit: evaluation_run.batch_job_id + Stage 2 — Embeddings unit: evaluation_run.embedding_batch_job_id + Stage 3 — Score + trace + cost (no marker; each step is idempotent) + Stage 4 — Mark completed + Stage 5 — Persist score unit (summary + per-trace) via the shared + save_score helper, so the cached trace unit (score_trace_url) + exists immediately and the read path (trace view / resync / + grouped export) mirrors the batch path without racing Langfuse + ingestion. + +See `Fast Evaluation SRD.md` for the full design. +""" + +import logging +import random +import time +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, TypeVar + +import numpy as np +import openai +from langfuse import Langfuse +from openai import OpenAI +from sqlmodel import Session + +from app.core.cloud.storage import get_cloud_storage +from app.core.config import settings +from app.core.storage_utils import ( + load_json_from_object_store, + upload_jsonl_to_object_store, +) +from app.crud.evaluations.batch import fetch_dataset_items +from app.crud.evaluations.core import ( + resolve_model_from_config, + save_score, + update_evaluation_run, +) +from app.crud.evaluations.cost import attach_cost +from app.crud.evaluations.embeddings import ( + EMBEDDING_MODEL, + calculate_cosine_similarity, +) +from app.crud.evaluations.langfuse import ( + create_langfuse_dataset_run, + update_traces_with_cosine_scores, +) +from app.crud.evaluations.score import EvaluationScore, TraceData, TraceScore +from app.crud.job import create_batch_job, get_batch_job +from app.models import EvaluationRun, EvaluationRunUpdate +from app.models.batch_job import BatchJobCreate +from app.models.evaluation import RunModeEnum +from app.models.llm.request import TextLLMParams + +logger = logging.getLogger(__name__) + +_T = TypeVar("_T") + + +# job_type discriminators on batch_job for the two fast-path stages. The row's +# presence + raw_output_url is what marks a stage as already done on retry. +JOB_TYPE_EVALUATION_FAST = "evaluation_fast" +JOB_TYPE_EMBEDDING_FAST = "embedding_fast" + + +# Per-call retry policy for Stage 1 / Stage 2. +_RETRY_MAX_ATTEMPTS = 5 +_RETRY_BASE_DELAY_SECONDS = 1.0 +_RETRY_MAX_DELAY_SECONDS = 30.0 + +_RETRYABLE_OPENAI_ERRORS: tuple[type[Exception], ...] = ( + openai.RateLimitError, + openai.APITimeoutError, + openai.APIConnectionError, + openai.InternalServerError, +) + + +def _sleep_with_backoff(attempt: int) -> None: + """Exponential backoff with full jitter, capped at _RETRY_MAX_DELAY_SECONDS.""" + base = min( + _RETRY_BASE_DELAY_SECONDS * (2 ** (attempt - 1)), _RETRY_MAX_DELAY_SECONDS + ) + delay = random.uniform(0, base) + time.sleep(delay) + + +def _call_with_retry(label: str, fn: Callable[[], _T]) -> _T: + """Call `fn()`, retrying transient OpenAI errors; permanent errors fail fast.""" + for attempt in range(1, _RETRY_MAX_ATTEMPTS + 1): + try: + return fn() + except _RETRYABLE_OPENAI_ERRORS as exc: + if attempt == _RETRY_MAX_ATTEMPTS: + logger.warning( + f"[_call_with_retry] Exhausted retries | label={label} | " + f"attempt={attempt} | error={exc}" + ) + raise + logger.info( + f"[_call_with_retry] Transient error, retrying | label={label} | " + f"attempt={attempt} | error={exc}" + ) + _sleep_with_backoff(attempt) + except openai.OpenAIError as exc: + logger.warning( + f"[_call_with_retry] Permanent error, no retry | label={label} | " + f"error={exc}" + ) + raise + # Unreachable: the loop always returns or raises. Keeps mypy happy. + raise RuntimeError(f"_call_with_retry exited without result for {label}") + + +def _build_responses_params( + *, + config: TextLLMParams, + question: str, +) -> dict[str, Any]: + """Build params for one Responses call, mirroring the batch path's body shape.""" + params: dict[str, Any] = { + "model": config.model, + "instructions": config.instructions, + "input": question, + } + + if "temperature" in config.model_fields_set: + params["temperature"] = config.temperature + + if config.reasoning: + params["reasoning"] = {"effort": config.reasoning} + + if config.knowledge_base_ids: + params["tools"] = [ + { + "type": "file_search", + "vector_store_ids": config.knowledge_base_ids, + "max_num_results": config.max_num_results or 20, + } + ] + + return params + + +def _field(obj: Any, name: str, default: Any = None) -> Any: + """Read a field from an object or dict (SDK object vs test dict), with a default.""" + if obj is None: + return default + if isinstance(obj, dict): + return obj.get(name, default) + return getattr(obj, name, default) + + +def _extract_response_text(response: Any) -> str: + """Extract generated text, preferring `output_text` then walking `output`.""" + output_text = _field(response, "output_text") + if output_text: + return output_text + + output = _field(response, "output") + if not output: + return "" + + for item in output: + if _field(item, "type") != "message": + continue + for content in _field(item, "content") or []: + if _field(content, "type") == "output_text": + text = _field(content, "text") + if text: + return text + return "" + + +def _usage_to_dict(usage: Any) -> dict[str, int]: + """Normalize an OpenAI usage object into the cost layer's dict shape.""" + return { + "input_tokens": int(_field(usage, "input_tokens", 0) or 0), + "output_tokens": int(_field(usage, "output_tokens", 0) or 0), + "total_tokens": int(_field(usage, "total_tokens", 0) or 0), + } + + +def _responses_call_for_item( + *, + openai_client: OpenAI, + config: TextLLMParams, + item: dict[str, Any], +) -> dict[str, Any]: + """Run one Responses call for a dataset item, in the batch path's per-item shape.""" + item_id = item["id"] + question = item["input"].get("question", "") if item.get("input") else "" + ground_truth = ( + item["expected_output"].get("answer", "") if item.get("expected_output") else "" + ) + question_id = (item.get("metadata") or {}).get("question_id") + + if not question: + return { + "item_id": item_id, + "question": "", + "generated_output": "ERROR: missing question in dataset item", + "ground_truth": ground_truth, + "response_id": None, + "usage": None, + "question_id": question_id, + "failed": True, + } + + params = _build_responses_params(config=config, question=question) + + try: + response = _call_with_retry( + label=f"responses.create:{item_id}", + fn=lambda: openai_client.responses.create(**params), + ) + except openai.OpenAIError as exc: + logger.warning( + f"[_responses_call_for_item] Item failed | item_id={item_id} | error={exc}" + ) + return { + "item_id": item_id, + "question": question, + "generated_output": f"ERROR: {exc}", + "ground_truth": ground_truth, + "response_id": None, + "usage": None, + "question_id": question_id, + "failed": True, + } + + return { + "item_id": item_id, + "question": question, + "generated_output": _extract_response_text(response), + "ground_truth": ground_truth, + "response_id": getattr(response, "id", None), + "usage": _usage_to_dict(getattr(response, "usage", None)), + "question_id": question_id, + "failed": False, + } + + +def _embedding_call_for_pair( + *, + openai_client: OpenAI, + embedding_model: str, + item_id: str, + output_text: str, + ground_truth: str, +) -> dict[str, Any]: + """Embed an (output, ground_truth) pair; `failed=True` on a terminal failure.""" + if not output_text or not ground_truth: + return { + "item_id": item_id, + "output_embedding": None, + "ground_truth_embedding": None, + "usage": None, + "failed": True, + "error": "empty output or ground_truth", + } + + try: + response = _call_with_retry( + label=f"embeddings.create:{item_id}", + fn=lambda: openai_client.embeddings.create( + model=embedding_model, + input=[output_text, ground_truth], + encoding_format="float", + ), + ) + except openai.OpenAIError as exc: + logger.warning( + f"[_embedding_call_for_pair] Item failed | item_id={item_id} | error={exc}" + ) + return { + "item_id": item_id, + "output_embedding": None, + "ground_truth_embedding": None, + "usage": None, + "failed": True, + "error": str(exc), + } + + data = _field(response, "data") or [] + if len(data) < 2: + return { + "item_id": item_id, + "output_embedding": None, + "ground_truth_embedding": None, + "usage": None, + "failed": True, + "error": f"expected 2 embeddings, got {len(data)}", + } + + output_embedding: list[float] | None = None + ground_truth_embedding: list[float] | None = None + for emb in data: + index = _field(emb, "index") + vector = _field(emb, "embedding") + if index == 0: + output_embedding = vector + elif index == 1: + ground_truth_embedding = vector + + usage_obj = _field(response, "usage") + usage_dict: dict[str, int] = { + "prompt_tokens": int(_field(usage_obj, "prompt_tokens", 0) or 0), + "total_tokens": int(_field(usage_obj, "total_tokens", 0) or 0), + } + + return { + "item_id": item_id, + "output_embedding": output_embedding, + "ground_truth_embedding": ground_truth_embedding, + "usage": usage_dict, + "failed": output_embedding is None or ground_truth_embedding is None, + } + + +def _is_failure_threshold_breached(*, failed_rows: int, total_rows: int) -> bool: + """True if the failed-row fraction exceeds EVAL_FAST_FAILURE_THRESHOLD.""" + if total_rows == 0: + return False + return (failed_rows / total_rows) > settings.EVAL_FAST_FAILURE_THRESHOLD + + +def _upload_unit_to_s3( + *, + session: Session, + project_id: int, + eval_run_id: int, + filename: str, + results: list[dict[str, Any]], +) -> str | None: + """Upload a stage unit (responses or embeddings) as JSON to S3.""" + storage = get_cloud_storage(session=session, project_id=project_id) + return upload_jsonl_to_object_store( + storage=storage, + results=results, + filename=filename, + subdirectory=f"evaluations/fast/{eval_run_id}", + format="json", + ) + + +def _load_unit_from_s3( + *, session: Session, project_id: int, url: str +) -> list[dict[str, Any]]: + """Load a stage unit back from S3. Raises if the unit cannot be loaded.""" + storage = get_cloud_storage(session=session, project_id=project_id) + data = load_json_from_object_store(storage=storage, url=url) + if data is None: + raise RuntimeError(f"Failed to load fast eval unit from S3 | url={url}") + if not isinstance(data, list): + raise RuntimeError( + f"Fast eval unit at {url} is not a list | type={type(data).__name__}" + ) + return data + + +def _stage1_responses( + *, + session: Session, + openai_client: OpenAI, + eval_run: EvaluationRun, + config: TextLLMParams, + dataset_items: list[dict[str, Any]], + log_prefix: str, +) -> tuple[EvaluationRun, list[dict[str, Any]]]: + """Stage 1 — one Responses call per dataset item. + + Skipped on retry if `eval_run.batch_job_id` is set; the unit is reloaded from S3. + + Concurrency assumption: this check-then-create is safe only against + *sequential* redelivery (a retry after the previous attempt fully stopped), + where the `batch_job_id` marker guarantees we never re-call OpenAI for work + that already succeeded. We rely on a single in-flight execution per + EvaluationRun — i.e. the broker visibility timeout is tuned above the task's + runtime so Celery does not redeliver while a worker is still running. If + that assumption is ever broken, two workers could both pass this guard; add + an advisory lock / atomic claim here before introducing concurrent delivery. + """ + if eval_run.batch_job_id: + existing = get_batch_job(session=session, batch_job_id=eval_run.batch_job_id) + if existing and existing.raw_output_url: + logger.info( + f"[_stage1_responses] {log_prefix} Skipping stage 1 (already done) | " + f"batch_job_id={existing.id}" + ) + results = _load_unit_from_s3( + session=session, + project_id=eval_run.project_id, + url=existing.raw_output_url, + ) + return eval_run, results + + logger.info( + f"[_stage1_responses] {log_prefix} Running stage 1 | " + f"items={len(dataset_items)} | model={config.model} | " + f"concurrency={settings.EVAL_FAST_API_CONCURRENCY}" + ) + + results: list[dict[str, Any]] = [] + max_workers = max(1, min(settings.EVAL_FAST_API_CONCURRENCY, len(dataset_items))) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit( + _responses_call_for_item, + openai_client=openai_client, + config=config, + item=item, + ): item["id"] + for item in dataset_items + } + for future in as_completed(futures): + results.append(future.result()) + + failed_count = sum(1 for r in results if r.get("failed")) + logger.info( + f"[_stage1_responses] {log_prefix} Stage 1 finished | " + f"total={len(results)} | failed={failed_count}" + ) + + if _is_failure_threshold_breached( + failed_rows=failed_count, total_rows=len(results) + ): + raise RuntimeError( + f"Fast eval Stage 1 exceeded failure threshold | " + f"failed={failed_count}/{len(results)} | " + f"threshold={settings.EVAL_FAST_FAILURE_THRESHOLD}" + ) + + raw_output_url = _upload_unit_to_s3( + session=session, + project_id=eval_run.project_id, + eval_run_id=eval_run.id, + filename=f"responses_{eval_run.id}.json", + results=results, + ) + + # Aggregate usage for the batch_job summary. + summed_usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + for r in results: + usage = r.get("usage") or {} + for k in summed_usage: + summed_usage[k] += int(usage.get(k, 0) or 0) + + batch_job = create_batch_job( + session=session, + batch_job_create=BatchJobCreate( + provider="openai", + job_type=JOB_TYPE_EVALUATION_FAST, + config={ + "endpoint": "/v1/responses", + "run_mode": RunModeEnum.FAST.value, + "model": config.model, + "usage": summed_usage, + }, + raw_output_url=raw_output_url, + total_items=len(results), + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + ), + ) + + # batch_job_id / total_items aren't on EvaluationRunUpdate; set them directly. + eval_run.batch_job_id = batch_job.id + eval_run.total_items = len(results) + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(), + ) + + return eval_run, results + + +def _stage2_embeddings( + *, + session: Session, + openai_client: OpenAI, + eval_run: EvaluationRun, + response_results: list[dict[str, Any]], + log_prefix: str, +) -> tuple[EvaluationRun, list[dict[str, Any]]]: + """Stage 2 — embed each (output, ground_truth) pair; skipped on retry if done.""" + if eval_run.embedding_batch_job_id: + existing = get_batch_job( + session=session, batch_job_id=eval_run.embedding_batch_job_id + ) + if existing and existing.raw_output_url: + logger.info( + f"[_stage2_embeddings] {log_prefix} Skipping stage 2 (already done) | " + f"batch_job_id={existing.id}" + ) + embeddings = _load_unit_from_s3( + session=session, + project_id=eval_run.project_id, + url=existing.raw_output_url, + ) + return eval_run, embeddings + + # Only embed items that succeeded in Stage 1. + embed_candidates = [r for r in response_results if not r.get("failed")] + logger.info( + f"[_stage2_embeddings] {log_prefix} Running stage 2 | " + f"items={len(embed_candidates)} | model={EMBEDDING_MODEL} | " + f"concurrency={settings.EVAL_FAST_API_CONCURRENCY}" + ) + + embedding_results: list[dict[str, Any]] = [] + max_workers = max( + 1, min(settings.EVAL_FAST_API_CONCURRENCY, len(embed_candidates) or 1) + ) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit( + _embedding_call_for_pair, + openai_client=openai_client, + embedding_model=EMBEDDING_MODEL, + item_id=r["item_id"], + output_text=r.get("generated_output", ""), + ground_truth=r.get("ground_truth", ""), + ): r["item_id"] + for r in embed_candidates + } + for future in as_completed(futures): + embedding_results.append(future.result()) + + failed_count = sum(1 for r in embedding_results if r.get("failed")) + # Threshold is over the whole dataset: Stage 1 failures count as failures too. + total_failures = failed_count + sum(1 for r in response_results if r.get("failed")) + logger.info( + f"[_stage2_embeddings] {log_prefix} Stage 2 finished | " + f"total={len(embedding_results)} | failed={failed_count}" + ) + + if _is_failure_threshold_breached( + failed_rows=total_failures, total_rows=len(response_results) + ): + raise RuntimeError( + f"Fast eval Stage 2 exceeded failure threshold | " + f"failed={total_failures}/{len(response_results)} | " + f"threshold={settings.EVAL_FAST_FAILURE_THRESHOLD}" + ) + + raw_output_url = _upload_unit_to_s3( + session=session, + project_id=eval_run.project_id, + eval_run_id=eval_run.id, + filename=f"embeddings_{eval_run.id}.json", + results=embedding_results, + ) + + summed_usage = {"prompt_tokens": 0, "total_tokens": 0} + for r in embedding_results: + usage = r.get("usage") or {} + for k in summed_usage: + summed_usage[k] += int(usage.get(k, 0) or 0) + + batch_job = create_batch_job( + session=session, + batch_job_create=BatchJobCreate( + provider="openai", + job_type=JOB_TYPE_EMBEDDING_FAST, + config={ + "endpoint": "/v1/embeddings", + "run_mode": RunModeEnum.FAST.value, + "embedding_model": EMBEDDING_MODEL, + "usage": summed_usage, + }, + raw_output_url=raw_output_url, + total_items=len(embedding_results), + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + ), + ) + + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(embedding_batch_job_id=batch_job.id), + ) + + return eval_run, embedding_results + + +def _stage3_score_and_trace( + *, + session: Session, + eval_run: EvaluationRun, + langfuse: Langfuse, + response_results: list[dict[str, Any]], + embedding_results: list[dict[str, Any]], + log_prefix: str, +) -> tuple[EvaluationRun, EvaluationScore]: + """Stage 3 — compute cosine, create Langfuse traces, attach costs. + + Returns the run plus the full score unit (summary_scores + per-trace + records) in the exact shape `fetch_trace_scores_from_langfuse` produces for + the batch path, so the caller can persist it via `save_score`. + + No stage marker; each step is idempotent (deterministic cosine, Langfuse + dedupes on the observe key, attach_cost overwrites per stage). + """ + logger.info( + f"[_stage3_score_and_trace] {log_prefix} Computing cosine + creating traces" + ) + + item_id_to_pair = { + r["item_id"]: r for r in embedding_results if not r.get("failed") + } + + model = resolve_model_from_config(session=session, eval_run=eval_run) + trace_id_mapping = create_langfuse_dataset_run( + langfuse=langfuse, + dataset_name=eval_run.dataset_name, + run_name=eval_run.run_name, + results=response_results, + model=model, + ) + + # Per-item cosine scores keyed on Langfuse trace_id (for Langfuse updates) + # and on item_id (for building the persisted per-trace records below). + per_item_scores: list[dict[str, Any]] = [] + item_id_to_score: dict[str, float] = {} + similarities: list[float] = [] + for response in response_results: + item_id = response["item_id"] + embedding_pair = item_id_to_pair.get(item_id) + trace_id = trace_id_mapping.get(item_id) + if not embedding_pair or not trace_id: + continue + if ( + embedding_pair.get("output_embedding") is None + or embedding_pair.get("ground_truth_embedding") is None + ): + continue + cosine = calculate_cosine_similarity( + embedding_pair["output_embedding"], + embedding_pair["ground_truth_embedding"], + ) + similarities.append(cosine) + item_id_to_score[item_id] = cosine + per_item_scores.append({"trace_id": trace_id, "cosine_similarity": cosine}) + + if per_item_scores: + try: + update_traces_with_cosine_scores( + langfuse=langfuse, per_item_scores=per_item_scores + ) + except Exception as exc: + # Score-update failures don't fail the run (score lives in eval_run.score). + logger.warning( + f"[_stage3_score_and_trace] {log_prefix} " + f"Failed to update Langfuse traces with scores | error={exc}", + exc_info=True, + ) + + # Aggregate similarity stats, in the batch path's summary_scores shape. + if similarities: + sim_array = np.array(similarities) + avg = float(np.mean(sim_array)) + std = float(np.std(sim_array)) + else: + avg = 0.0 + std = 0.0 + + score_payload = { + "summary_scores": [ + { + "name": "Cosine Similarity", + "avg": round(avg, 2), + "std": round(std, 2), + "total_pairs": len(similarities), + "data_type": "NUMERIC", + } + ] + } + + # Attach response- and embedding-stage costs (attach_cost is idempotent per stage). + if response_results: + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + response_model=model, + response_results=response_results, + ) + + # attach_cost expects the raw OpenAI batch shape; rebuild it from embedding_results. + if embedding_results: + embedding_raw = [ + { + "response": { + "body": { + "usage": r.get("usage") + or {"prompt_tokens": 0, "total_tokens": 0} + } + } + } + for r in embedding_results + if not r.get("failed") + ] + if embedding_raw: + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + embedding_model=EMBEDDING_MODEL, + embedding_raw_results=embedding_raw, + ) + + # Build the per-trace records in the same shape the batch path persists (via + # fetch_trace_scores_from_langfuse). One record per response that has a + # Langfuse trace; the cosine score is attached when its embedding succeeded. + traces: list[TraceData] = [] + for response in response_results: + item_id = response["item_id"] + trace_id = trace_id_mapping.get(item_id) + if not trace_id: + continue + trace_scores: list[TraceScore] = [] + cosine = item_id_to_score.get(item_id) + if cosine is not None: + trace_scores.append( + { + "name": "Cosine Similarity", + "value": round(cosine, 2), + "data_type": "NUMERIC", + "comment": ( + "Cosine similarity between generated output and " + "ground truth embeddings" + ), + } + ) + traces.append( + { + "trace_id": trace_id, + "question": response.get("question", ""), + "llm_answer": response.get("generated_output", ""), + "ground_truth_answer": response.get("ground_truth", ""), + "question_id": response.get("question_id"), + "scores": trace_scores, + } + ) + + # Persist cost here; the score unit (summary + traces) is persisted by the + # caller via save_score so it lands in S3 (score_trace_url) like the batch path. + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(cost=eval_run.cost), + ) + + score: EvaluationScore = { + "summary_scores": score_payload["summary_scores"], + "traces": traces, + } + return eval_run, score + + +def run_fast_evaluation( + *, + session: Session, + openai_client: OpenAI, + langfuse: Langfuse, + eval_run: EvaluationRun, + config: TextLLMParams, +) -> EvaluationRun: + """Run the full fast-eval pipeline for one evaluation_run. + + Called from the `run_evaluation_fast` task. Stages are skipped on retry when + their batch_job marker is set. Raises on terminal failure (run marked failed). + """ + log_prefix = ( + f"[org={eval_run.organization_id}]" + f"[project={eval_run.project_id}]" + f"[eval={eval_run.id}]" + ) + logger.info(f"[run_fast_evaluation] {log_prefix} Starting fast evaluation pipeline") + + if eval_run.status == "pending": + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(status="processing"), + ) + + dataset_items = fetch_dataset_items( + langfuse=langfuse, dataset_name=eval_run.dataset_name + ) + if not dataset_items: + raise ValueError( + f"Dataset '{eval_run.dataset_name}' returned no items for fast eval" + ) + + # Stage 1 + eval_run, response_results = _stage1_responses( + session=session, + openai_client=openai_client, + eval_run=eval_run, + config=config, + dataset_items=dataset_items, + log_prefix=log_prefix, + ) + + # Stage 2 + eval_run, embedding_results = _stage2_embeddings( + session=session, + openai_client=openai_client, + eval_run=eval_run, + response_results=response_results, + log_prefix=log_prefix, + ) + + # Stage 3 + eval_run, score = _stage3_score_and_trace( + session=session, + eval_run=eval_run, + langfuse=langfuse, + response_results=response_results, + embedding_results=embedding_results, + log_prefix=log_prefix, + ) + + # Stage 4 + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(status="completed"), + ) + + # Stage 5 — persist the score unit (traces to S3, summary to DB) via the + # shared batch helper so the read path serves the cached unit instead of + # racing Langfuse ingestion. + saved = save_score( + eval_run_id=eval_run.id, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + score=score, + ) + if saved is not None: + eval_run = saved + eval_run.score = score + + logger.info( + f"[run_fast_evaluation] {log_prefix} Fast evaluation completed | " + f"total_items={eval_run.total_items}" + ) + return eval_run diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index ae5bf3cf0..177f200c8 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -99,6 +99,7 @@ EvaluationRunCreate, EvaluationRunPublic, EvaluationRunUpdate, + RunModeEnum, ) from .feature_flag import ( FeatureFlag, diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index 8a95ff054..14c396420 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -1,4 +1,5 @@ from datetime import datetime +from enum import Enum from typing import TYPE_CHECKING, Any, Optional from uuid import UUID @@ -16,6 +17,11 @@ from .project import Project +class RunModeEnum(str, Enum): + BATCH = "batch" + FAST = "fast" + + class DatasetItem(BaseModel): """Model for a single dataset item (Q&A pair).""" @@ -47,6 +53,15 @@ class DatasetUploadResponse(BaseModel): signed_url: str | None = Field( None, description="A signed URL for downloading the dataset" ) + eligible_for_fast: bool = Field( + False, + description=( + "Size predicate only: True if the dataset's unique-row count is at or " + "below the fast-mode threshold (settings.EVAL_FAST_MAX_UNIQUE_ROWS). " + "Other fast-mode prerequisites (e.g. a Langfuse dataset id, a text " + "OpenAI config) are not reflected here." + ), + ) class EvaluationResult(BaseModel): @@ -192,6 +207,12 @@ class EvaluationRun(SQLModel, table=True): __table_args__ = ( Index("idx_eval_run_status_org", "status", "organization_id"), Index("idx_eval_run_status_project", "status", "project_id"), + UniqueConstraint( + "organization_id", + "project_id", + "run_name", + name="uq_evaluation_run_org_project_run_name", + ), ) id: int = SQLField( @@ -284,6 +305,12 @@ class EvaluationRun(SQLModel, table=True): "comment": "Evaluation status (pending, processing, completed, failed)" }, ) + run_mode: str = SQLField( + default=RunModeEnum.BATCH.value, + max_length=10, + nullable=False, + sa_column_kwargs={"comment": "Execution mode: batch or fast"}, + ) object_store_url: str | None = SQLField( default=None, description="Object store URL of processed evaluation results for future reference", @@ -421,6 +448,7 @@ class EvaluationRunPublic(SQLModel): batch_job_id: int | None embedding_batch_job_id: int | None status: str + run_mode: str object_store_url: str | None score_trace_url: str | None total_items: int diff --git a/backend/app/services/evaluations/__init__.py b/backend/app/services/evaluations/__init__.py index 92d88fe0b..3776e60c4 100644 --- a/backend/app/services/evaluations/__init__.py +++ b/backend/app/services/evaluations/__init__.py @@ -5,6 +5,11 @@ get_evaluation_with_scores, start_evaluation, ) +from app.services.evaluations.fast import ( + execute_fast_evaluation, + is_dataset_fast_eligible, + validate_and_start_fast_evaluation, +) from app.services.evaluations.validators import ( ALLOWED_EXTENSIONS, ALLOWED_MIME_TYPES, diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 9f78b9e37..ca969323a 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -4,8 +4,11 @@ from uuid import UUID from fastapi import HTTPException +from sqlalchemy.exc import IntegrityError from sqlmodel import Session +from app.core.cloud.storage import get_cloud_storage +from app.core.storage_utils import load_json_from_object_store from app.crud.evaluations import ( create_evaluation_run, fetch_trace_scores_from_langfuse, @@ -15,15 +18,92 @@ save_score, start_evaluation_batch, ) -from app.models.evaluation import EvaluationRun -from app.models.llm.request import TextLLMParams, STTLLMParams, TTSLLMParams +from app.models.evaluation import EvaluationRun, RunModeEnum +from app.models.llm.request import STTLLMParams, TextLLMParams, TTSLLMParams from app.services.llm.providers import LLMProvider from app.utils import get_langfuse_client, get_openai_client -from app.core.cloud.storage import get_cloud_storage -from app.core.storage_utils import load_json_from_object_store logger = logging.getLogger(__name__) +# Error code surfaced in HTTPException.detail when a run_name collides with the +# (organization_id, project_id, run_name) unique constraint. Shared by the batch +# and fast paths so both return an identical 409 contract. +ERR_RUN_NAME_ALREADY_EXISTS = "run_name_already_exists" + +# Name of the (organization_id, project_id, run_name) unique constraint on +# evaluation_run. Used to distinguish a run_name collision (-> 409) from any +# other IntegrityError (e.g. FK violations), which must not be masked. +_RUN_NAME_UNIQUE_CONSTRAINT = "uq_evaluation_run_org_project_run_name" + + +def _is_run_name_conflict(error: IntegrityError) -> bool: + """True only when the IntegrityError is the run_name uniqueness violation.""" + constraint_name = getattr( + getattr(error.orig, "diag", None), "constraint_name", None + ) + if constraint_name: + return constraint_name == _RUN_NAME_UNIQUE_CONSTRAINT + # Fall back to matching the constraint name in the raw error text for + # drivers/dialects that don't expose structured diagnostics. + return _RUN_NAME_UNIQUE_CONSTRAINT in str(error.orig or error) + + +def create_evaluation_run_or_409( + *, + session: Session, + run_name: str, + dataset_name: str, + dataset_id: int, + config_id: UUID, + config_version: int, + organization_id: int, + project_id: int, + run_mode: RunModeEnum | str = RunModeEnum.BATCH, + log_context: str, +) -> EvaluationRun: + """Create an EvaluationRun, translating a duplicate-run_name collision into 409. + + The (organization_id, project_id, run_name) unique constraint guards against + double-click / client-retry races; on collision we roll back and raise a 409 + instead of leaking the IntegrityError. + """ + try: + return create_evaluation_run( + session=session, + run_name=run_name, + dataset_name=dataset_name, + dataset_id=dataset_id, + config_id=config_id, + config_version=config_version, + organization_id=organization_id, + project_id=project_id, + run_mode=run_mode, + ) + except IntegrityError as exc: + session.rollback() + # Only a run_name collision becomes a 409; any other IntegrityError + # (e.g. a dataset/config FK violation) is a real failure and must not be + # masked as a duplicate-name error. + if not _is_run_name_conflict(exc): + logger.error( + f"[{log_context}] IntegrityError creating run | run_name={run_name} | " + f"org_id={organization_id} | project_id={project_id} | error={exc}", + exc_info=True, + ) + raise + logger.warning( + f"[{log_context}] Duplicate run_name | run_name={run_name} | " + f"org_id={organization_id} | project_id={project_id}" + ) + raise HTTPException( + status_code=409, + detail=( + f"{ERR_RUN_NAME_ALREADY_EXISTS}: a run with name '{run_name}' " + "already exists for this organization and project. Pick a new " + "run_name or fetch the existing run via GET /evaluations." + ), + ) + def start_evaluation( session: Session, @@ -129,7 +209,7 @@ def start_evaluation( ) # Step 3: Create EvaluationRun record with config references - eval_run = create_evaluation_run( + eval_run = create_evaluation_run_or_409( session=session, run_name=experiment_name, dataset_name=dataset.name, @@ -138,6 +218,7 @@ def start_evaluation( config_version=config_version, organization_id=organization_id, project_id=project_id, + log_context="start_evaluation", ) # Step 4: Start the batch evaluation diff --git a/backend/app/services/evaluations/fast.py b/backend/app/services/evaluations/fast.py new file mode 100644 index 000000000..6be44b5f7 --- /dev/null +++ b/backend/app/services/evaluations/fast.py @@ -0,0 +1,279 @@ +"""Fast evaluation orchestration service. + +This is the only place that decides whether a /evaluations request enters the +fast-eval path. It also hosts the worker-side entry point invoked by the +`run_evaluation_fast` Celery task. + +See `Fast Evaluation SRD.md` for the full design. +""" + +import logging +from uuid import UUID + +from fastapi import HTTPException +from sqlmodel import Session + +from app.celery.utils import start_fast_evaluation as enqueue_fast_evaluation +from app.core.config import settings +from app.core.db import engine +from app.crud.evaluations import ( + get_dataset_by_id, + resolve_evaluation_config, + run_fast_evaluation, +) +from app.crud.evaluations.core import update_evaluation_run +from app.models.evaluation import EvaluationRun, EvaluationRunUpdate, RunModeEnum +from app.models.llm.request import TextLLMParams +from app.services.evaluations.evaluation import create_evaluation_run_or_409 +from app.services.llm.providers import LLMProvider +from app.utils import get_langfuse_client, get_openai_client + +logger = logging.getLogger(__name__) + + +# Error codes surfaced in HTTPException.detail so the UI can localize/branch. +ERR_CONFIG_TYPE_UNSUPPORTED = "config_type_unsupported" +ERR_DATASET_TOO_LARGE_FOR_FAST = "dataset_too_large_for_fast" + + +def is_dataset_fast_eligible(*, original_items_count: int) -> bool: + """A dataset is eligible for fast mode when its unique-row count is within cap.""" + return original_items_count <= settings.EVAL_FAST_MAX_UNIQUE_ROWS + + +def validate_and_start_fast_evaluation( + *, + session: Session, + dataset_id: int, + run_name: str, + config_id: UUID, + config_version: int, + organization_id: int, + project_id: int, + trace_id: str = "N/A", +) -> EvaluationRun: + """Validate + create + dispatch a fast evaluation run. + + Validation (in order): + 1. Dataset exists and has a Langfuse id. + 2. Config resolves to a text-type OpenAI config. + 3. Dataset's original_items_count <= EVAL_FAST_MAX_UNIQUE_ROWS. + 4. (organization_id, project_id, run_name) is unique — enforced by the DB + constraint; a collision is translated to 409 by the shared helper. + + On success the function creates the EvaluationRun row with + `run_mode="fast"`, `status="processing"`, and enqueues the orchestrator + task. The caller (route) returns the row immediately. + """ + logger.info( + f"[validate_and_start_fast_evaluation] Starting fast eval | " + f"run_name={run_name} | dataset_id={dataset_id} | " + f"org_id={organization_id} | project_id={project_id}" + ) + + # 1. Dataset must exist + have a Langfuse id (same as batch path). + dataset = get_dataset_by_id( + session=session, + dataset_id=dataset_id, + organization_id=organization_id, + project_id=project_id, + ) + if not dataset: + raise HTTPException( + status_code=404, + detail=( + f"Dataset {dataset_id} not found or not accessible to this " + "organization/project" + ), + ) + if not dataset.langfuse_dataset_id: + raise HTTPException( + status_code=400, + detail=( + f"Dataset {dataset_id} has no Langfuse dataset id; cannot run " + "evaluation." + ), + ) + + # 2. Config must resolve and be a text OpenAI config. + config_blob, error = resolve_evaluation_config( + session=session, + config_id=config_id, + config_version=config_version, + project_id=project_id, + ) + if error or config_blob is None: + raise HTTPException( + status_code=400, + detail=f"Failed to resolve config: {error}", + ) + if config_blob.completion.provider != LLMProvider.OPENAI: + raise HTTPException( + status_code=422, + detail="Only 'openai' provider is supported for evaluation configs", + ) + if config_blob.completion.type != "text": + raise HTTPException( + status_code=422, + detail=ERR_CONFIG_TYPE_UNSUPPORTED, + ) + + # 3. Dataset must be small enough for fast eval. + original_items_count = (dataset.dataset_metadata or {}).get("original_items_count") + if original_items_count is None: + raise HTTPException( + status_code=422, + detail=( + f"{ERR_DATASET_TOO_LARGE_FOR_FAST}: dataset {dataset_id} is " + "missing 'original_items_count' metadata; cannot verify it has at " + f"most {settings.EVAL_FAST_MAX_UNIQUE_ROWS} unique rows for fast mode." + ), + ) + if not is_dataset_fast_eligible(original_items_count=original_items_count): + raise HTTPException( + status_code=422, + detail=( + f"{ERR_DATASET_TOO_LARGE_FOR_FAST}: dataset has " + f"{original_items_count} unique rows; fast mode requires at most " + f"{settings.EVAL_FAST_MAX_UNIQUE_ROWS}." + ), + ) + + # 4. Create the run; the shared helper translates a duplicate run_name into 409. + eval_run = create_evaluation_run_or_409( + session=session, + run_name=run_name, + dataset_name=dataset.name, + dataset_id=dataset_id, + config_id=config_id, + config_version=config_version, + organization_id=organization_id, + project_id=project_id, + run_mode=RunModeEnum.FAST, + log_context="validate_and_start_fast_evaluation", + ) + + # Flip to processing before dispatching the task so the GET endpoint + # reflects the correct state immediately. + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(status="processing"), + ) + + # Dispatch the orchestrator. If enqueue fails, mark the run as failed so it + # doesn't linger in `processing` forever. + try: + enqueue_fast_evaluation(eval_run_id=eval_run.id, trace_id=trace_id) + except Exception as exc: + logger.error( + f"[validate_and_start_fast_evaluation] Failed to enqueue task | " + f"eval_run_id={eval_run.id} | error={exc}", + exc_info=True, + ) + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate( + status="failed", + error_message=f"Failed to enqueue fast eval task: {exc}", + ), + ) + raise HTTPException( + status_code=500, + detail="Failed to enqueue fast evaluation task", + ) + + return eval_run + + +def execute_fast_evaluation(*, eval_run_id: int) -> None: + """Worker-side entry point: run the full fast-eval pipeline. + + Called from the `run_evaluation_fast` Celery task. Opens its own DB + session so the task is self-contained, then resolves config + clients and + delegates to `run_fast_evaluation` (CRUD). + + On terminal failure the run is marked `failed` with a descriptive + error_message and the exception is re-raised so Celery records the failure + (no automatic retry for this task — stage-level idempotency is the retry + surface). + """ + logger.info(f"[execute_fast_evaluation] Starting | eval_run_id={eval_run_id}") + + with Session(engine) as session: + eval_run = session.get(EvaluationRun, eval_run_id) + if eval_run is None: + logger.error( + f"[execute_fast_evaluation] EvaluationRun not found | " + f"eval_run_id={eval_run_id}" + ) + raise ValueError(f"EvaluationRun {eval_run_id} not found") + + if eval_run.run_mode != RunModeEnum.FAST.value: + logger.error( + f"[execute_fast_evaluation] Wrong run_mode for fast task | " + f"eval_run_id={eval_run_id} | run_mode={eval_run.run_mode}" + ) + raise ValueError( + f"EvaluationRun {eval_run_id} has run_mode={eval_run.run_mode}, " + f"expected 'fast'" + ) + + if eval_run.status == "completed": + logger.info( + f"[execute_fast_evaluation] Run already completed, skipping | " + f"eval_run_id={eval_run_id}" + ) + return + + try: + config_blob, error = resolve_evaluation_config( + session=session, + config_id=eval_run.config_id, + config_version=eval_run.config_version, + project_id=eval_run.project_id, + ) + if error or config_blob is None: + raise ValueError(f"Failed to resolve config: {error}") + + text_params = TextLLMParams.model_validate(config_blob.completion.params) + + openai_client = get_openai_client( + session=session, + org_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + langfuse_client = get_langfuse_client( + session=session, + org_id=eval_run.organization_id, + project_id=eval_run.project_id, + ) + + run_fast_evaluation( + session=session, + openai_client=openai_client, + langfuse=langfuse_client, + eval_run=eval_run, + config=text_params, + ) + + except Exception as exc: + logger.error( + f"[execute_fast_evaluation] Run failed | " + f"eval_run_id={eval_run_id} | error={exc}", + exc_info=True, + ) + # Re-fetch the row in case our session was rolled back. + session.rollback() + failed_run = session.get(EvaluationRun, eval_run_id) + if failed_run is not None: + update_evaluation_run( + session=session, + eval_run=failed_run, + update=EvaluationRunUpdate( + status="failed", + error_message=f"Fast eval failed: {exc}", + ), + ) + raise diff --git a/backend/app/tests/api/routes/test_evaluation_fast.py b/backend/app/tests/api/routes/test_evaluation_fast.py new file mode 100644 index 000000000..2fe153954 --- /dev/null +++ b/backend/app/tests/api/routes/test_evaluation_fast.py @@ -0,0 +1,762 @@ +"""Tests for the fast (synchronous) evaluation path. + +Covers FR-1 through FR-15 from the Fast Evaluation SRD. External boundaries +(OpenAI, Langfuse, S3, Celery dispatch) are mocked; the DB is real (`db` +fixture). +""" + +import random +from collections.abc import Iterator +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import openai +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session + +from app.crud.evaluations.fast import ( + JOB_TYPE_EMBEDDING_FAST, + JOB_TYPE_EVALUATION_FAST, + _call_with_retry, + _is_failure_threshold_breached, + _stage1_responses, + _stage2_embeddings, + run_fast_evaluation, +) +from app.models import Config, EvaluationDataset, EvaluationRun +from app.models.batch_job import BatchJob +from app.models.evaluation import RunModeEnum +from app.models.llm.request import ( + ConfigBlob, + KaapiCompletionConfig, + TextLLMParams, +) +from app.tests.utils.auth import TestAuthContext +from app.tests.utils.test_data import ( + create_test_config, + create_test_evaluation_dataset, +) + + +def _api_error(resp_body: dict) -> str: + """Pull the human-readable error string out of an APIResponse failure body.""" + return str(resp_body.get("error") or resp_body.get("detail") or resp_body).lower() + + +@pytest.fixture(autouse=True) +def _seeded_random() -> Iterator[None]: + """Make jitter / random.choice deterministic so tests are repeatable.""" + random.seed(0) + yield + + +# --------------------------------------------------------------------------- +# Pure-function helpers (no FastAPI, no DB) +# --------------------------------------------------------------------------- + + +class TestFailureThreshold: + """`_is_failure_threshold_breached` controls run-level fail-fast.""" + + def test_returns_false_when_total_is_zero(self) -> None: + assert _is_failure_threshold_breached(failed_rows=0, total_rows=0) is False + + def test_returns_true_above_threshold(self) -> None: + # default EVAL_FAST_FAILURE_THRESHOLD = 0.5 + assert _is_failure_threshold_breached(failed_rows=6, total_rows=10) is True + + def test_returns_false_at_threshold(self) -> None: + # 0.5 / 1.0 is NOT greater-than the threshold, so do not breach + assert _is_failure_threshold_breached(failed_rows=5, total_rows=10) is False + + +class TestCallWithRetry: + """FR-8: transient OpenAI errors retry; permanent ones do not.""" + + def test_returns_immediately_on_success(self) -> None: + result = _call_with_retry(label="t", fn=lambda: "ok") + assert result == "ok" + + def test_retries_on_transient_then_succeeds(self, monkeypatch) -> None: + # Avoid sleeping — make backoff a no-op. + monkeypatch.setattr("app.crud.evaluations.fast.time.sleep", lambda *_: None) + + calls = {"n": 0} + + def flaky(): + calls["n"] += 1 + if calls["n"] < 3: + # APIConnectionError needs a request; pass a minimal object. + raise openai.APIConnectionError(request=MagicMock()) + return "ok" + + result = _call_with_retry(label="t", fn=flaky) + assert result == "ok" + assert calls["n"] == 3 + + def test_does_not_retry_on_permanent_error(self) -> None: + def bad(): + # AuthenticationError is a non-retryable OpenAIError subclass. + raise openai.AuthenticationError( + message="bad key", response=MagicMock(), body=None + ) + + with pytest.raises(openai.AuthenticationError): + _call_with_retry(label="t", fn=bad) + + +# --------------------------------------------------------------------------- +# Route validation: POST /evaluations with run_mode=fast (FR-1..FR-5, FR-15) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _patch_dispatch() -> Iterator[MagicMock]: + """Stub the Celery dispatch so tests don't actually enqueue work.""" + with patch( + "app.services.evaluations.fast.enqueue_fast_evaluation", + return_value="fake-task-id", + ) as m: + yield m + + +def _make_fast_eligible_dataset( + *, + db: Session, + user_api_key: TestAuthContext, + original_items_count: int = 3, +) -> EvaluationDataset: + return create_test_evaluation_dataset( + db=db, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + original_items_count=original_items_count, + duplication_factor=1, + ) + + +def _make_text_openai_config(db: Session, project_id: int) -> Config: + """Create a stored text-OpenAI Kaapi config (eligible for fast eval).""" + return create_test_config( + db=db, + project_id=project_id, + use_kaapi_schema=True, + ) + + +class TestFastEvaluationRoute: + """End-to-end validation on POST /evaluations with run_mode='fast'.""" + + def test_fr4_accepts_eligible_request_and_dispatches( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + _patch_dispatch, + ): + """FR-4: eligible request returns processing + dispatches orchestrator.""" + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + + resp = client.post( + "/api/v1/evaluations", + json={ + "experiment_name": "fr4-fast-run", + "dataset_id": dataset.id, + "config_id": str(config.id), + "config_version": 1, + "run_mode": "fast", + }, + headers=user_api_key_header, + ) + + assert resp.status_code == 200, resp.text + body = resp.json()["data"] + assert body["run_mode"] == "fast" + assert body["status"] == "processing" + assert body["run_name"] == "fr4-fast-run" + _patch_dispatch.assert_called_once() + + # DB state matches the response. + run = db.get(EvaluationRun, body["id"]) + assert run is not None + assert run.run_mode == RunModeEnum.FAST.value + assert run.status == "processing" + + def test_fr2_rejects_dataset_with_too_many_unique_rows( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + _patch_dispatch, + ): + """FR-2: >10 unique rows → 422 dataset_too_large_for_fast.""" + # default EVAL_FAST_MAX_UNIQUE_ROWS = 10; create 11 unique rows + dataset = _make_fast_eligible_dataset( + db=db, user_api_key=user_api_key, original_items_count=11 + ) + config = _make_text_openai_config(db, user_api_key.project_id) + + resp = client.post( + "/api/v1/evaluations", + json={ + "experiment_name": "fr2-fast-run", + "dataset_id": dataset.id, + "config_id": str(config.id), + "config_version": 1, + "run_mode": "fast", + }, + headers=user_api_key_header, + ) + + assert resp.status_code == 422 + # The route wraps HTTPException.detail into APIResponse.error. + error_str = _api_error(resp.json()) + assert "dataset_too_large_for_fast" in error_str + assert "11" in error_str # surfaces actual unique-row count + _patch_dispatch.assert_not_called() + + def test_fr1_rejects_non_text_config( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + _patch_dispatch, + ): + """FR-1: non-text config for fast mode → 422 config_type_unsupported. + + Build a stored config whose completion.type is not 'text'. The current + config factories produce text configs by default, so we patch the + resolved blob to look like an STT config. + """ + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + + # Patch resolve_evaluation_config to return an STT-type blob. + fake_blob = ConfigBlob( + completion=KaapiCompletionConfig( + provider="openai", + type="stt", + params={"model": "whisper-1"}, + ) + ) + + with patch( + "app.services.evaluations.fast.resolve_evaluation_config", + return_value=(fake_blob, None), + ): + resp = client.post( + "/api/v1/evaluations", + json={ + "experiment_name": "fr1-fast-run", + "dataset_id": dataset.id, + "config_id": str(config.id), + "config_version": 1, + "run_mode": "fast", + }, + headers=user_api_key_header, + ) + + assert resp.status_code == 422 + assert "config_type_unsupported" in _api_error(resp.json()) + _patch_dispatch.assert_not_called() + + def test_fr3_rejects_duplicate_run_name( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + _patch_dispatch, + ): + """FR-3: duplicate (org, project, run_name) → 409, no second dispatch.""" + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + payload = { + "experiment_name": "fr3-dup-run", + "dataset_id": dataset.id, + "config_id": str(config.id), + "config_version": 1, + "run_mode": "fast", + } + + first = client.post( + "/api/v1/evaluations", json=payload, headers=user_api_key_header + ) + assert first.status_code == 200, first.text + + second = client.post( + "/api/v1/evaluations", json=payload, headers=user_api_key_header + ) + assert second.status_code == 409 + assert "run_name_already_exists" in _api_error(second.json()) + # First call dispatched; second must not have. + assert _patch_dispatch.call_count == 1 + + def test_fr15_get_evaluation_returns_run_mode( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + ): + """FR-15: GET /evaluations/{id} surfaces `run_mode` for both modes.""" + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + eval_run = EvaluationRun( + run_name="fr15-existing", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + status="completed", + run_mode=RunModeEnum.FAST.value, + total_items=3, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + resp = client.get( + f"/api/v1/evaluations/{eval_run.id}", headers=user_api_key_header + ) + assert resp.status_code == 200 + assert resp.json()["data"]["run_mode"] == "fast" + + +# --------------------------------------------------------------------------- +# Dataset listing eligibility filter (FR-5) +# --------------------------------------------------------------------------- + + +class TestDatasetListEligibleForFast: + def test_fr5_filters_to_fast_eligible_only( + self, + client: TestClient, + user_api_key_header: dict[str, str], + db: Session, + user_api_key: TestAuthContext, + ): + """FR-5: eligible_for=fast filters list to datasets with ≤10 unique rows.""" + eligible = _make_fast_eligible_dataset( + db=db, user_api_key=user_api_key, original_items_count=5 + ) + ineligible = _make_fast_eligible_dataset( + db=db, user_api_key=user_api_key, original_items_count=20 + ) + + resp = client.get( + "/api/v1/evaluations/datasets", + params={"eligible_for": "fast"}, + headers=user_api_key_header, + ) + assert resp.status_code == 200 + data = resp.json()["data"] + ids = {d["dataset_id"] for d in data} + assert eligible.id in ids + assert ineligible.id not in ids + assert all(d["eligible_for_fast"] is True for d in data) + + +# --------------------------------------------------------------------------- +# Stage skipping on retry (FR-6, FR-7) +# --------------------------------------------------------------------------- + + +def _fake_openai_response(text: str = "answer", item_id: str = "item-1"): + """Mimic the SDK's response.responses.create return shape.""" + return SimpleNamespace( + id=f"resp_{item_id}", + output_text=text, + output=[], + usage=SimpleNamespace(input_tokens=10, output_tokens=20, total_tokens=30), + ) + + +def _fake_embedding_response(): + """Mimic openai.embeddings.create return shape (2 vectors).""" + return SimpleNamespace( + data=[ + SimpleNamespace(index=0, embedding=[1.0, 0.0, 0.0]), + SimpleNamespace(index=1, embedding=[1.0, 0.0, 0.0]), + ], + usage=SimpleNamespace(prompt_tokens=5, total_tokens=5), + ) + + +class TestStageSkipping: + """FR-6 / FR-7: stages skip on retry when their batch_job marker is set.""" + + def test_fr6_stage1_skips_when_batch_job_id_already_set( + self, + db: Session, + user_api_key: TestAuthContext, + ): + """Pre-existing batch_job → Stage 1 does not call the Responses API.""" + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + eval_run = EvaluationRun( + run_name="fr6-stage1-skip", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + status="processing", + run_mode=RunModeEnum.FAST.value, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + # Pre-create a batch_job marker as if Stage 1 had already completed. + existing = BatchJob( + provider="openai", + job_type=JOB_TYPE_EVALUATION_FAST, + config={}, + raw_output_url="s3://bucket/responses_x.json", + total_items=1, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(existing) + db.commit() + db.refresh(existing) + + eval_run.batch_job_id = existing.id + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + cached = [ + { + "item_id": "item-1", + "question": "q", + "generated_output": "a", + "ground_truth": "a", + "response_id": "resp_1", + "usage": {"input_tokens": 1, "output_tokens": 1, "total_tokens": 2}, + "question_id": 1, + "failed": False, + } + ] + + fake_openai = MagicMock() + with patch("app.crud.evaluations.fast._load_unit_from_s3", return_value=cached): + _, results = _stage1_responses( + session=db, + openai_client=fake_openai, + eval_run=eval_run, + config=TextLLMParams(model="gpt-4o", instructions="x"), + dataset_items=[ + { + "id": "item-1", + "input": {"question": "q"}, + "expected_output": {"answer": "a"}, + "metadata": {}, + } + ], + log_prefix="[t]", + ) + + # Skip path returns the cached unit and never calls the OpenAI client. + assert results == cached + fake_openai.responses.create.assert_not_called() + + def test_fr7_stage2_skips_when_embedding_batch_job_id_already_set( + self, + db: Session, + user_api_key: TestAuthContext, + ): + """Pre-existing embedding batch_job → Stage 2 does not call the Embeddings API.""" + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + eval_run = EvaluationRun( + run_name="fr7-stage2-skip", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + status="processing", + run_mode=RunModeEnum.FAST.value, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + marker = BatchJob( + provider="openai", + job_type=JOB_TYPE_EMBEDDING_FAST, + config={}, + raw_output_url="s3://bucket/embeddings_x.json", + total_items=1, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(marker) + db.commit() + db.refresh(marker) + + eval_run.embedding_batch_job_id = marker.id + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + cached = [ + { + "item_id": "item-1", + "output_embedding": [1.0, 0.0], + "ground_truth_embedding": [1.0, 0.0], + "usage": {"prompt_tokens": 5, "total_tokens": 5}, + "failed": False, + } + ] + + fake_openai = MagicMock() + with patch("app.crud.evaluations.fast._load_unit_from_s3", return_value=cached): + _, results = _stage2_embeddings( + session=db, + openai_client=fake_openai, + eval_run=eval_run, + response_results=[ + { + "item_id": "item-1", + "question": "q", + "generated_output": "a", + "ground_truth": "a", + "failed": False, + } + ], + log_prefix="[t]", + ) + + assert results == cached + fake_openai.embeddings.create.assert_not_called() + + +# --------------------------------------------------------------------------- +# End-to-end orchestrator pipeline with mocked externals (FR-9..FR-14) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _fast_pipeline_mocks(): + """Patch external boundaries used inside `run_fast_evaluation`. + + OpenAI client returns fixed responses/embeddings, Langfuse returns trace + ids, S3 upload returns a URL, and `attach_cost` no-ops so the test does + not need a model_config row. + """ + with ( + patch("app.crud.evaluations.fast.fetch_dataset_items") as mock_fetch_items, + patch( + "app.crud.evaluations.fast._upload_unit_to_s3", + side_effect=lambda **kw: f"s3://bucket/{kw['filename']}", + ), + patch( + "app.crud.evaluations.fast.create_langfuse_dataset_run" + ) as mock_create_traces, + patch( + "app.crud.evaluations.fast.update_traces_with_cosine_scores" + ) as mock_update_traces, + patch( + "app.crud.evaluations.fast.resolve_model_from_config", return_value="gpt-4o" + ), + patch("app.crud.evaluations.fast.attach_cost") as mock_attach_cost, + ): + mock_fetch_items.return_value = [ + { + "id": "item-1", + "input": {"question": "Q1"}, + "expected_output": {"answer": "A1"}, + "metadata": {"question_id": 1}, + }, + { + "id": "item-2", + "input": {"question": "Q2"}, + "expected_output": {"answer": "A2"}, + "metadata": {"question_id": 2}, + }, + ] + mock_create_traces.return_value = { + "item-1": "trace-1", + "item-2": "trace-2", + } + yield SimpleNamespace( + fetch_items=mock_fetch_items, + create_traces=mock_create_traces, + update_traces=mock_update_traces, + attach_cost=mock_attach_cost, + ) + + +class TestFastPipelineEndToEnd: + """Run the orchestrator with mocked external boundaries (FR-9..FR-13).""" + + def test_pipeline_completes_with_scores_and_writes_batch_jobs( + self, + db: Session, + user_api_key: TestAuthContext, + _fast_pipeline_mocks, + ): + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + eval_run = EvaluationRun( + run_name="pipeline-happy-path", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + status="pending", + run_mode=RunModeEnum.FAST.value, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + fake_openai = MagicMock() + fake_openai.responses.create.side_effect = lambda **_: _fake_openai_response( + text="LLM answer", item_id="x" + ) + fake_openai.embeddings.create.return_value = _fake_embedding_response() + fake_langfuse = MagicMock() + + # save_score opens its own Session(engine), which can't see this test's + # rolled-back transaction. Mirror its S3-success path against the test + # session so the persisted state (summary in DB, traces in S3) is + # observable here. (In production the worker's commits are visible across + # connections, so the real save_score works unchanged.) + def _fake_save_score(*, eval_run_id, score, **_): + run = db.get(EvaluationRun, eval_run_id) + run.score = {"summary_scores": score["summary_scores"]} + run.score_trace_url = f"s3://bucket/traces_{eval_run_id}.json" + db.add(run) + db.commit() + db.refresh(run) + return run + + with patch( + "app.crud.evaluations.fast.save_score", side_effect=_fake_save_score + ): + result = run_fast_evaluation( + session=db, + openai_client=fake_openai, + langfuse=fake_langfuse, + eval_run=eval_run, + config=TextLLMParams(model="gpt-4o", instructions="be helpful"), + ) + + # FR-11/FR-14: status completed, summary cosine ≈ 1.0 for identical vectors + assert result.status == "completed" + assert result.score is not None + cosine = result.score["summary_scores"][0] + assert cosine["name"] == "Cosine Similarity" + assert cosine["avg"] == pytest.approx(1.0, abs=0.01) + assert cosine["total_pairs"] == 2 + + # Stage markers exist (FR-6/FR-7 invariant + FR-9 — no llm_call rows). + assert result.batch_job_id is not None + assert result.embedding_batch_job_id is not None + responses_job = db.get(BatchJob, result.batch_job_id) + embeddings_job = db.get(BatchJob, result.embedding_batch_job_id) + assert responses_job.job_type == JOB_TYPE_EVALUATION_FAST + assert responses_job.raw_output_url is not None + assert embeddings_job.job_type == JOB_TYPE_EMBEDDING_FAST + assert embeddings_job.raw_output_url is not None + + # FR-12: Langfuse traces created and per-trace scores attached. + assert _fast_pipeline_mocks.create_traces.called + assert _fast_pipeline_mocks.update_traces.called + + # FR-13: attach_cost called twice (response + embedding stages). + assert _fast_pipeline_mocks.attach_cost.call_count == 2 + + # Cached trace unit is persisted like the batch path so the read path + # (trace view / resync / grouped export) never has to hit Langfuse. + run = db.get(EvaluationRun, result.id) + assert run.score_trace_url is not None + # Full unit (summary + per-trace records) is surfaced on the return. + traces = result.score["traces"] + assert len(traces) == 2 + by_trace = {t["trace_id"]: t for t in traces} + assert {"trace-1", "trace-2"} == set(by_trace) + sample = by_trace["trace-1"] + assert sample["question"] == "Q1" + assert sample["ground_truth_answer"] == "A1" + assert sample["scores"][0]["name"] == "Cosine Similarity" + assert sample["scores"][0]["value"] == pytest.approx(1.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# Failure-threshold short-circuit (FR-14) +# --------------------------------------------------------------------------- + + +class TestFailureThresholdInPipeline: + def test_fr14_stage1_raises_when_failure_ratio_exceeds_threshold( + self, + db: Session, + user_api_key: TestAuthContext, + ): + """FR-14: Stage 1 raises RuntimeError when failure ratio > threshold. + + The outer orchestrator (run_fast_evaluation / execute_fast_evaluation) + catches the RuntimeError and marks the run failed; the structural + guarantee under test is that Stage 1 fails fast instead of proceeding + to Stage 2 with a mostly-broken response set. + """ + dataset = _make_fast_eligible_dataset(db=db, user_api_key=user_api_key) + config = _make_text_openai_config(db, user_api_key.project_id) + eval_run = EvaluationRun( + run_name="fr14-fail-threshold", + dataset_name=dataset.name, + dataset_id=dataset.id, + config_id=config.id, + config_version=1, + status="processing", + run_mode=RunModeEnum.FAST.value, + organization_id=user_api_key.organization_id, + project_id=user_api_key.project_id, + ) + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + # Make all OpenAI Responses calls fail with a permanent error so retries + # short-circuit and every item gets failed=True. With every item + # failing, the failure fraction (1.0) is well above the 0.5 threshold. + fake_openai = MagicMock() + fake_openai.responses.create.side_effect = openai.AuthenticationError( + message="bad key", response=MagicMock(), body=None + ) + + dataset_items = [ + { + "id": f"item-{i}", + "input": {"question": f"Q{i}"}, + "expected_output": {"answer": f"A{i}"}, + "metadata": {}, + } + for i in range(4) + ] + + with pytest.raises(RuntimeError, match="failure threshold"): + _stage1_responses( + session=db, + openai_client=fake_openai, + eval_run=eval_run, + config=TextLLMParams(model="gpt-4o", instructions="x"), + dataset_items=dataset_items, + log_prefix="[t]", + )