Skip to content
Open
6 changes: 4 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ fastapi run --reload app/main.py
# Run pre-commit hooks
uv run pre-commit run --all-files

# Generate database migration (rev-id should be latest existing revision ID + 1)
alembic revision --autogenerate -m "Description" --rev-id 040
# Generate database migration.
# Compute <next_rev_id> at runtime as the latest existing revision ID + 1,
# zero-padded to 3 digits (check the highest NNN in app/alembic/versions/NNN_*.py).
alembic revision --autogenerate -m "Description" --rev-id <next_rev_id>

# Seed database with test data
uv run python -m app.seed_data.seed_data
Expand Down
85 changes: 85 additions & 0 deletions backend/app/alembic/versions/064_add_run_mode_to_evaluation_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""add run_mode column and unique run-name constraint to evaluation_run

Revision ID: 064
Revises: 063
Create Date: 2026-05-20 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "064"
down_revision = "063"
branch_labels = None
depends_on = None

disable_per_migration_transaction = True

_UNIQUE_INDEX = "uq_evaluation_run_org_project_run_name"
_UNIQUE_CONSTRAINT = "uq_evaluation_run_org_project_run_name"


def upgrade():
# 1. Add run_mode as nullable first so existing rows are backfilled by the
# server default, then tighten to NOT NULL. The server default is left
# in place as a safety net.
with op.get_context().autocommit_block():
op.add_column(
"evaluation_run",
sa.Column(
"run_mode",
sa.String(length=10),
nullable=True,
server_default=sa.text("'batch'"),
comment="Execution mode: batch or fast",
),
)
op.execute("ALTER TABLE evaluation_run ALTER COLUMN run_mode SET NOT NULL")

# 2. Resolve duplicate (organization_id, project_id, run_name) tuples
# non-destructively before adding the unique constraint. Keep the
# lowest-id row's run_name untouched and rename the rest by appending a
# unique "__dup_<id>" suffix so no historical run (and its scores, result
# URLs, or batch_job links) is lost.
with op.get_context().autocommit_block():
op.execute(
"""
UPDATE evaluation_run e
SET run_name = e.run_name || '__dup_' || e.id
WHERE e.id <> (
SELECT MIN(x.id)
FROM evaluation_run x
WHERE x.organization_id = e.organization_id
AND x.project_id = e.project_id
AND x.run_name = e.run_name
)
"""
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# 3. Build the unique index CONCURRENTLY so the scan does not take an
# AccessExclusiveLock, then attach it as a named constraint via
# ADD CONSTRAINT ... USING INDEX (brief catalog-only lock).
with op.get_context().autocommit_block():
op.execute(
f"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "
f'"{_UNIQUE_INDEX}" '
f"ON evaluation_run (organization_id, project_id, run_name)"
)
op.execute(
f"ALTER TABLE evaluation_run "
f'ADD CONSTRAINT "{_UNIQUE_CONSTRAINT}" '
f'UNIQUE USING INDEX "{_UNIQUE_INDEX}"'
)


def downgrade():
# Reverse in opposite order to upgrade().
op.execute(
f"ALTER TABLE evaluation_run "
f'DROP CONSTRAINT IF EXISTS "{_UNIQUE_CONSTRAINT}"'
)
with op.get_context().autocommit_block():
op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{_UNIQUE_INDEX}"')
op.drop_column("evaluation_run", "run_mode")
51 changes: 43 additions & 8 deletions backend/app/api/docs/evaluation/create_evaluation.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
Start an evaluation run using the OpenAI Batch API.
Start an evaluation run against a stored dataset.

Evaluations allow you to systematically test LLM configurations against
predefined datasets with automatic progress tracking and result collection.
Two execution modes are supported via the optional `run_mode` field:

* `batch` (default) — submits the work to the OpenAI Batch API. Cost-efficient
for large datasets; turnaround can take up to 24 hours.
* `fast` — runs the evaluation synchronously through the OpenAI Responses API
and returns results within seconds-to-minutes. Restricted to text
evaluations on datasets with at most `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows.

**Key Features:**
* Fetches dataset items from Langfuse and creates a batch processing job via the OpenAI Batch API
* Asynchronous processing with automatic progress tracking (checks every 60s)
* Fetches dataset items from Langfuse and creates a job (batch or fast)
* Uses a stored config (created via `/configs`) to define the provider parameters
* Stores results for comparison and analysis
* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results
* Same scoring semantics across both modes — cosine similarity, Langfuse traces,
and optional LLM-as-Judge correctness
* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results;
the response carries `run_mode` so clients can tell the two paths apart

## Example
## Example (batch — default)

```json
{
Expand All @@ -20,3 +26,32 @@ predefined datasets with automatic progress tracking and result collection.
"config_version": 1
}
```

## Example (fast)

```json
{
"dataset_id": 123,
"experiment_name": "may19-temp0.2-gpt4o-fast",
"config_id": "f54f0d67-4817-4103-9fdf-b74b3d46733e",
"config_version": 1,
"run_mode": "fast"
}
```

## Fast-mode error responses

These apply only when `run_mode` is `fast`.

| Status | Code | When |
| --- | --- | --- |
| 422 | `config_type_unsupported` | Resolved config is not a text-evaluation config |
| 422 | `dataset_too_large_for_fast` | Dataset exceeds `EVAL_FAST_MAX_UNIQUE_ROWS` unique rows |

## General error responses

These apply to both `batch` and `fast` modes.

| Status | Code | When |
| --- | --- | --- |
| 409 | `run_name_already_exists` | A run with the same `experiment_name` already exists for this (organization, project) |
Comment thread
coderabbitai[bot] marked this conversation as resolved.
9 changes: 8 additions & 1 deletion backend/app/api/docs/evaluation/list_datasets.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
List all datasets for the current organization and project.

Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, and object store URL.
Returns a paginated list of datasets ordered by most recent first. Each dataset includes metadata (ID, name, item counts, duplication factor), Langfuse integration details, object store URL, and an `eligible_for_fast` flag that is `true` when the dataset's unique-row count is within `EVAL_FAST_MAX_UNIQUE_ROWS` (and so can be used with `run_mode="fast"` on `POST /evaluations`).

## Query parameters

| Parameter | Description |
| --- | --- |
| `limit` / `offset` | Pagination (default 50 / 0; max limit 100) |
| `eligible_for` | If set to `fast`, the response is filtered to only datasets where `eligible_for_fast` is `true` |
27 changes: 22 additions & 5 deletions backend/app/api/routes/evaluations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
from app.core.cloud import get_cloud_storage
from app.crud.evaluations import (
get_dataset_by_id,
)
from app.crud.evaluations import (
list_datasets as list_evaluation_datasets,
)
from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud
from app.models.evaluation import DatasetUploadResponse, EvaluationDataset
from app.services.evaluations import (
upload_dataset as upload_evaluation_dataset,
is_dataset_fast_eligible,
validate_csv_file,
)
from app.services.evaluations import (
upload_dataset as upload_evaluation_dataset,
)
from app.utils import (
APIResponse,
load_description,
Expand All @@ -39,13 +44,15 @@ def _dataset_to_response(
dataset: EvaluationDataset, signed_url: str | None = None
) -> DatasetUploadResponse:
"""Convert a dataset model to a DatasetUploadResponse."""
original_items = dataset.dataset_metadata.get("original_items_count", 0)
return DatasetUploadResponse(
dataset_id=dataset.id,
dataset_name=dataset.name,
description=dataset.description,
total_items=dataset.dataset_metadata.get("total_items_count", 0),
original_items=dataset.dataset_metadata.get("original_items_count", 0),
original_items=original_items,
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
eligible_for_fast=is_dataset_fast_eligible(original_items_count=original_items),
langfuse_dataset_id=dataset.langfuse_dataset_id,
object_store_url=dataset.object_store_url,
signed_url=signed_url,
Expand Down Expand Up @@ -104,6 +111,15 @@ def list_datasets(
default=50, ge=1, le=100, description="Maximum number of datasets to return"
),
offset: int = Query(default=0, ge=0, description="Number of datasets to skip"),
eligible_for: str
| None = Query(
default=None,
description=(
"If 'fast', return only datasets eligible for run_mode='fast' "
"(unique-row count within EVAL_FAST_MAX_UNIQUE_ROWS)."
),
enum=["fast"],
),
) -> APIResponse[list[DatasetUploadResponse]]:
"""List evaluation datasets."""
datasets = list_evaluation_datasets(
Expand All @@ -112,11 +128,12 @@ def list_datasets(
project_id=auth_context.project_.id,
limit=limit,
offset=offset,
eligible_for_fast=(eligible_for == "fast"),
)

return APIResponse.success_response(
data=[_dataset_to_response(dataset) for dataset in datasets]
)
dataset_responses = [_dataset_to_response(dataset) for dataset in datasets]

return APIResponse.success_response(data=dataset_responses)


@router.get(
Expand Down
30 changes: 28 additions & 2 deletions backend/app/api/routes/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from uuid import UUID

from asgi_correlation_id import correlation_id
from fastapi import (
APIRouter,
Body,
Expand All @@ -12,13 +13,14 @@
)

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud
from app.crud.evaluations.core import group_traces_by_question_id
from app.models.evaluation import EvaluationRunPublic
from app.api.permissions import Permission, require_permission
from app.models.evaluation import EvaluationRunPublic, RunModeEnum
from app.services.evaluations import (
get_evaluation_with_scores,
start_evaluation,
validate_and_start_fast_evaluation,
)
from app.utils import (
APIResponse,
Expand All @@ -45,8 +47,32 @@ def evaluate(
),
config_id: UUID = Body(..., description="Stored config ID"),
config_version: int = Body(..., ge=1, description="Stored config version"),
run_mode: RunModeEnum = Body(
default=RunModeEnum.BATCH,
description="Execution mode: 'batch' (default) or 'fast'",
),
) -> APIResponse[EvaluationRunPublic]:
"""Start an evaluation run."""
logger.info(
f"[evaluate] Starting evaluation | run_mode={run_mode.value} | "
f"experiment_name={experiment_name} | dataset_id={dataset_id} | "
f"org_id={auth_context.organization_.id} | "
f"project_id={auth_context.project_.id}"
)

if run_mode == RunModeEnum.FAST:
eval_run = validate_and_start_fast_evaluation(
session=session,
dataset_id=dataset_id,
run_name=experiment_name,
config_id=config_id,
config_version=config_version,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
trace_id=correlation_id.get() or "N/A",
)
return APIResponse.success_response(data=eval_run)

eval_run = start_evaluation(
session=session,
dataset_id=dataset_id,
Expand Down
7 changes: 7 additions & 0 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def initialize_worker(**_) -> None:
include=[
"app.celery.tasks.job_execution",
"app.celery.tasks.notifications",
"app.celery.tasks.evaluation_fast",
],
)

Expand All @@ -186,6 +187,12 @@ def initialize_worker(**_) -> None:
routing_key="low",
queue_arguments={"x-max-priority": 1},
),
Queue(
"evaluations",
exchange=default_exchange,
routing_key="evaluations",
queue_arguments={"x-max-priority": 6},
),
Queue("cron", exchange=default_exchange, routing_key="cron"),
Queue("default", exchange=default_exchange, routing_key="default"),
),
Expand Down
56 changes: 56 additions & 0 deletions backend/app/celery/tasks/evaluation_fast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Celery task for the synchronous (fast) text-evaluation pipeline.

This module hosts the single orchestrator task per fast evaluation run. The
heavy lifting lives in `app/services/evaluations/fast.py`; this task is a thin
shim that sets the correlation id, attaches the OTel parent context, and
delegates.

See `Fast Evaluation SRD.md` for the design (queue, retries, idempotency).
"""

import logging

from celery import Task, current_task

from app.celery.celery_app import celery_app
from app.celery.tasks.job_execution import _run_with_otel_parent, _set_trace
from app.celery.utils import gevent_timeout
from app.core.config import settings

logger = logging.getLogger(__name__)

# Sentinel correlation id used when no trace id is propagated from the
# enqueueing request. Matches the codebase-wide "N/A" default (see
# app/core/logger.py and app/celery/utils.py).
DEFAULT_TRACE_ID = "N/A"


@celery_app.task(bind=True, queue="evaluations", priority=6)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_evaluation_fast")
def run_evaluation_fast(
self: Task, eval_run_id: int, trace_id: str = DEFAULT_TRACE_ID
) -> None:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Run the fast evaluation pipeline for one EvaluationRun.

Idempotency: each stage is skipped on retry when its `batch_job` marker is
already set on the EvaluationRun, so Celery redelivery never re-calls
OpenAI for work that already succeeded.

Args:
eval_run_id: ID of the EvaluationRun (run_mode="fast").
trace_id: Correlation id from the enqueueing request, propagated into
the worker for log correlation.
"""
from app.services.evaluations.fast import execute_fast_evaluation

_set_trace(trace_id)
logger.info(
f"[run_evaluation_fast] Starting fast evaluation task | "
f"eval_run_id={eval_run_id} | task_id={current_task.request.id}"
)

return _run_with_otel_parent(
self,
lambda: execute_fast_evaluation(eval_run_id=eval_run_id),
)
Loading
Loading