From 83e0ec1b8d68bf8461842fe3f7b85117967c7b9c Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sat, 20 Jun 2026 23:12:57 +0530 Subject: [PATCH 1/7] feat: add celery and redis to docker compose to pull celery & redis docker image, updated celery broker url to use redis endpoint. Added required packages. --- docker/.env.example | 4 ++++ docker/dev/compose.yml | 51 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ 3 files changed, 57 insertions(+) diff --git a/docker/.env.example b/docker/.env.example index 026a231..d5ccca5 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -20,6 +20,10 @@ OLLAMA_TIMEOUT=300 WHISPER_HOST=http://whisper:9000 WHISPER_MODEL=small.en +# --- Celery / Redis ------------------------------------------------------- +CELERY_BROKER_URL=redis://redis:6379/0 +CELERY_RESULT_BACKEND=redis://redis:6379/0 + # --- Gunicorn (prod only) ------------------------------------------------- GUNICORN_WORKERS=2 diff --git a/docker/dev/compose.yml b/docker/dev/compose.yml index 83de924..22f8a5d 100644 --- a/docker/dev/compose.yml +++ b/docker/dev/compose.yml @@ -54,6 +54,21 @@ services: retries: 5 start_period: 60s + redis: + image: redis:7-alpine + container_name: fireform-redis + ports: + - "127.0.0.1:6379:6379" + volumes: + - redis_data:/data + networks: + - fireform-network + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + app: build: context: ../.. @@ -66,6 +81,8 @@ services: condition: service_healthy whisper: condition: service_started + redis: + condition: service_healthy entrypoint: ["sh", "docker/entrypoint.sh"] command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] volumes: @@ -86,6 +103,38 @@ services: - FIREFORM_TEMPLATE_DIR=/data/uploads - FIREFORM_DB_ECHO=${FIREFORM_DB_ECHO:-true} - FRONTEND_ORIGINS=${FRONTEND_ORIGINS:-http://localhost:5173,http://127.0.0.1:5173} + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + networks: + - fireform-network + + celery-worker: + build: + context: ../.. + dockerfile: docker/dev/Dockerfile + container_name: fireform-celery-worker + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + ollama: + condition: service_healthy + command: ["celery", "-A", "app.core.celery:celery_app", "worker", "--loglevel=info", "--concurrency=4"] + volumes: + - ../..:/app + - fireform_uploads:/data/uploads + environment: + - PYTHONUNBUFFERED=1 + - PYTHONPATH=/app + - DATABASE_URL=postgresql://fireform:fireform@postgres:5432/fireform + - OLLAMA_HOST=${OLLAMA_HOST:-http://ollama:11434} + - OLLAMA_TIMEOUT=${OLLAMA_TIMEOUT:-300} + - OLLAMA_MODEL=${OLLAMA_MODEL:-qwen2.5:1.5b} + - FIREFORM_DATA_DIR=/data/uploads + - FIREFORM_TEMPLATE_DIR=/data/uploads + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 networks: - fireform-network @@ -98,6 +147,8 @@ volumes: driver: local fireform_uploads: driver: local + redis_data: + driver: local networks: fireform-network: diff --git a/requirements.txt b/requirements.txt index 5f57f06..066e00b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ numpy<2 ollama pypdf python-multipart +celery[redis] +redis \ No newline at end of file From 2bdc700fe61b00b875dd020fad6571e65dc966b1 Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sat, 20 Jun 2026 23:39:37 +0530 Subject: [PATCH 2/7] configured celery to FireForm --- app/core/celery.py | 19 +++++++++++++++++++ app/core/config.py | 4 ++++ app/models/__init__.py | 4 ++-- 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 app/core/celery.py diff --git a/app/core/celery.py b/app/core/celery.py new file mode 100644 index 0000000..4efe5db --- /dev/null +++ b/app/core/celery.py @@ -0,0 +1,19 @@ +from celery import Celery + +from app.core.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND + +celery_app = Celery( + "fireform", + broker=CELERY_BROKER_URL, + backend=CELERY_RESULT_BACKEND, +) + +celery_app.conf.update( + task_serializer="json", + result_serializer="json", + accept_content=["json"], + task_track_started=True, + result_expires=86400, +) + +celery_app.conf.include = ["app.tasks.fill"] diff --git a/app/core/config.py b/app/core/config.py index d6f723d..d89cea7 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -37,6 +37,10 @@ OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:1.5b") WHISPER_HOST = os.getenv("WHISPER_HOST", "http://localhost:9000").rstrip("/") +# --- Celery / Redis ------------------------------------------------------- +CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") +CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") + # --- CORS ----------------------------------------------------------------- _DEFAULT_ORIGINS = "http://127.0.0.1:5173,http://localhost:5173" ALLOWED_ORIGINS = [ diff --git a/app/models/__init__.py b/app/models/__init__.py index f46e17d..9c5433b 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -1,5 +1,5 @@ """ORM models. Import from here: `from app.models import Template`.""" -from app.models.models import FormSubmission, Template +from app.models.models import FormSubmission, Job, Template -__all__ = ["Template", "FormSubmission"] +__all__ = ["Template", "FormSubmission", "Job"] From 53f36d9043ba27e4d574111198222228b1a4f0d3 Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sat, 20 Jun 2026 23:55:24 +0530 Subject: [PATCH 3/7] added required schemas for asuncformfill and to track celery jobs --- app/api/schemas/forms.py | 38 ++++++++++++++++++++++++++++++++++++-- app/models/models.py | 15 ++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/app/api/schemas/forms.py b/app/api/schemas/forms.py index 6a833c8..8be2882 100644 --- a/app/api/schemas/forms.py +++ b/app/api/schemas/forms.py @@ -1,9 +1,9 @@ from pydantic import BaseModel, field_validator + class FormFill(BaseModel): template_id: int input_text: str - # Optional Ollama model override for this fill; falls back to OLLAMA_MODEL. model: str | None = None @field_validator("input_text") @@ -29,4 +29,38 @@ class TranscriptionResponse(BaseModel): class ModelsResponse(BaseModel): models: list[str] - default: str \ No newline at end of file + default: str + + +class AsyncFormFill(BaseModel): + template_ids: list[int] + input_text: str + model: str | None = None + + @field_validator("input_text") + def validate_input_text(cls, value): + if not value or not value.strip(): + raise ValueError("Input text cannot be empty") + return value + + @field_validator("template_ids") + def validate_template_ids(cls, value): + if not value: + raise ValueError("template_ids cannot be empty") + return value + + +class JobResponse(BaseModel): + id: int + celery_task_id: str + template_id: int + status: str + output_pdf_path: str | None = None + error: str | None = None + + class Config: + from_attributes = True + + +class AsyncFormFillResponse(BaseModel): + jobs: list[JobResponse] \ No newline at end of file diff --git a/app/models/models.py b/app/models/models.py index 8f82655..9afc2bf 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -16,4 +16,17 @@ class FormSubmission(SQLModel, table=True): template_id: int = Field(foreign_key="template.id") input_text: str output_pdf_path: str - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) \ No newline at end of file + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class Job(SQLModel, table=True): + id: int | None = Field(default=None, primary_key=True) + celery_task_id: str = Field(index=True) + template_id: int = Field(foreign_key="template.id") + input_text: str + status: str = Field(default="pending") + output_pdf_path: str | None = None + error: str | None = None + model: str | None = None + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) \ No newline at end of file From 436b15c12fc18c36ade92b119a1222b7aa0c4f2d Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sun, 21 Jun 2026 17:37:04 +0530 Subject: [PATCH 4/7] feat: add universal job tracking with UUID, progress, and structured errors --- app/api/router.py | 3 +- app/api/routes/jobs.py | 60 ++++++++++++++++++++++++++++++++++++++ app/api/schemas/forms.py | 23 +++++++++++---- app/db/repositories.py | 31 +++++++++++++++++++- app/models/models.py | 17 +++++++---- app/tasks/fill.py | 63 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 183 insertions(+), 14 deletions(-) create mode 100644 app/api/routes/jobs.py create mode 100644 app/tasks/fill.py diff --git a/app/api/router.py b/app/api/router.py index cba6242..27b7bcb 100644 --- a/app/api/router.py +++ b/app/api/router.py @@ -9,10 +9,11 @@ from fastapi import APIRouter -from app.api.routes import forms, templates +from app.api.routes import forms, jobs, templates from app.api.v1.router import v1_router api_router = APIRouter() api_router.include_router(templates.router) api_router.include_router(forms.router) api_router.include_router(v1_router) +api_router.include_router(jobs.router) \ No newline at end of file diff --git a/app/api/routes/jobs.py b/app/api/routes/jobs.py new file mode 100644 index 0000000..5af9008 --- /dev/null +++ b/app/api/routes/jobs.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter, Depends +from sqlmodel import Session + +from app.api.deps import get_db +from app.api.schemas.forms import ( + AsyncFormFill, + AsyncFormFillResponse, + AsyncJobSubmitResponse, + JobResponse, +) +from app.core.errors.base import AppError +from app.db.repositories import create_job, get_job_by_uuid, get_template +from app.models import Job +from app.tasks.fill import fill_form_task + +router = APIRouter(tags=["jobs"]) + + +@router.get("/jobs/{job_id}", response_model=JobResponse) +def get_job_status(job_id: str, db: Session = Depends(get_db)): + job = get_job_by_uuid(db, job_id) + if not job: + raise AppError("Job not found", status_code=404) + return JobResponse( + job_id=job.job_id, + job_type=job.job_type, + status=job.status, + progress_percent=job.progress_percent, + result_url=job.result_url, + error=job.error, + created_at=job.created_at.isoformat() if job.created_at else None, + updated_at=job.updated_at.isoformat() if job.updated_at else None, + ) + + +@router.post("/forms/jobs", response_model=AsyncFormFillResponse) +def submit_async_form_fill(form: AsyncFormFill, db: Session = Depends(get_db)): + for tid in form.template_ids: + if not get_template(db, tid): + raise AppError(f"Template {tid} not found", status_code=404) + + jobs: list[AsyncJobSubmitResponse] = [] + for tid in form.template_ids: + result = fill_form_task.delay(tid, form.input_text, form.model) + job = Job( + celery_task_id=result.id, + job_type="form_generation", + template_id=tid, + input_text=form.input_text, + status="queued", + model=form.model, + ) + job = create_job(db, job) + jobs.append(AsyncJobSubmitResponse( + job_id=job.job_id, + status=job.status, + poll_url=f"/api/v1/jobs/{job.job_id}", + )) + + return AsyncFormFillResponse(jobs=jobs) diff --git a/app/api/schemas/forms.py b/app/api/schemas/forms.py index 8be2882..155b14f 100644 --- a/app/api/schemas/forms.py +++ b/app/api/schemas/forms.py @@ -51,16 +51,27 @@ def validate_template_ids(cls, value): class JobResponse(BaseModel): - id: int - celery_task_id: str - template_id: int + job_id: str + job_type: str + status: str + progress_percent: int = 0 + result_url: str | None = None + error: dict | None = None + created_at: str | None = None + updated_at: str | None = None + + class Config: + from_attributes = True + + +class AsyncJobSubmitResponse(BaseModel): + job_id: str status: str - output_pdf_path: str | None = None - error: str | None = None + poll_url: str class Config: from_attributes = True class AsyncFormFillResponse(BaseModel): - jobs: list[JobResponse] \ No newline at end of file + jobs: list[AsyncJobSubmitResponse] \ No newline at end of file diff --git a/app/db/repositories.py b/app/db/repositories.py index ebb80de..8b6a2c4 100644 --- a/app/db/repositories.py +++ b/app/db/repositories.py @@ -1,5 +1,5 @@ from sqlmodel import Session, select -from app.models import Template, FormSubmission +from app.models import Template, FormSubmission, Job # Templates def create_template(session: Session, template: Template) -> Template: @@ -22,3 +22,32 @@ def create_form(session: Session, form: FormSubmission) -> FormSubmission: session.commit() session.refresh(form) return form + + +# Jobs +def create_job(session: Session, job: Job) -> Job: + session.add(job) + session.commit() + session.refresh(job) + return job + + +def get_job(session: Session, job_id: int) -> Job | None: + return session.get(Job, job_id) + + +def get_job_by_uuid(session: Session, job_uuid: str) -> Job | None: + statement = select(Job).where(Job.job_id == job_uuid) + return session.exec(statement).first() + + +def get_job_by_celery_id(session: Session, celery_task_id: str) -> Job | None: + statement = select(Job).where(Job.celery_task_id == celery_task_id) + return session.exec(statement).first() + + +def update_job(session: Session, job: Job) -> Job: + session.add(job) + session.commit() + session.refresh(job) + return job diff --git a/app/models/models.py b/app/models/models.py index 9afc2bf..c4ddf0b 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -1,5 +1,7 @@ -from sqlmodel import SQLModel, Field +import uuid as uuid_mod + from sqlalchemy import Column, JSON +from sqlmodel import SQLModel, Field from datetime import datetime, timezone @@ -21,12 +23,15 @@ class FormSubmission(SQLModel, table=True): class Job(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) + job_id: str = Field(default_factory=lambda: str(uuid_mod.uuid4()), index=True, unique=True) celery_task_id: str = Field(index=True) - template_id: int = Field(foreign_key="template.id") - input_text: str - status: str = Field(default="pending") - output_pdf_path: str | None = None - error: str | None = None + job_type: str = Field(default="form_generation") + template_id: int | None = Field(default=None, foreign_key="template.id") + input_text: str | None = None + status: str = Field(default="queued") + progress_percent: int = Field(default=0) + result_url: str | None = None + error: dict | None = Field(default=None, sa_column=Column(JSON)) model: str | None = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) \ No newline at end of file diff --git a/app/tasks/fill.py b/app/tasks/fill.py new file mode 100644 index 0000000..fc7b311 --- /dev/null +++ b/app/tasks/fill.py @@ -0,0 +1,63 @@ +import logging +from datetime import datetime, timezone + +from app.core.celery import celery_app +from app.db.database import get_session +from app.db.repositories import get_job_by_celery_id, get_template, update_job, create_form +from app.models import FormSubmission +from app.services.controller import Controller + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="fill_form") +def fill_form_task(self, template_id: int, input_text: str, model: str | None = None): + session = next(get_session()) + try: + job = get_job_by_celery_id(session, self.request.id) + if not job: + raise RuntimeError(f"No job row for celery task {self.request.id}") + + job.status = "processing" + job.progress_percent = 10 + job.updated_at = datetime.now(timezone.utc) + update_job(session, job) + + template = get_template(session, template_id) + if not template: + raise ValueError(f"Template {template_id} not found") + + controller = Controller() + path = controller.fill_form( + user_input=input_text, + fields=template.fields, + pdf_form_path=template.pdf_path, + model=model, + ) + + submission = FormSubmission( + template_id=template_id, + input_text=input_text, + output_pdf_path=path, + ) + create_form(session, submission) + + job.status = "completed" + job.progress_percent = 100 + job.result_url = f"/api/v1/forms/{submission.id}/download" + job.updated_at = datetime.now(timezone.utc) + update_job(session, job) + + return {"job_id": job.job_id, "result_url": job.result_url} + + except Exception as e: + logger.exception("fill_form_task failed") + job = get_job_by_celery_id(session, self.request.id) + if job: + job.status = "failed" + job.error = {"error_code": "TASK_FAILED", "message": str(e)} + job.updated_at = datetime.now(timezone.utc) + update_job(session, job) + raise + finally: + session.close() From 838b85ae9949aefc6e7230ca4949b5281bee1150 Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sun, 21 Jun 2026 17:37:28 +0530 Subject: [PATCH 5/7] added make command to see logs for workers --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 13ea366..0c8fa4f 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,7 @@ help: @echo "make logs - Stream all container logs" @echo "make logs-app - Stream app container logs" @echo "make logs-ollama - Stream Ollama container logs" + @echo "make logs-worker - Stream Celery worker logs" @echo "make shell - Open shell in running app container" @echo "make pull-model - Pull Ollama model from .env.dev ($(OLLAMA_MODEL))" @echo "make test - Run test suite" @@ -80,6 +81,9 @@ logs-app: logs-ollama: @$(COMPOSE) logs -f ollama +logs-worker: + @$(COMPOSE) logs -f celery-worker + shell: @$(COMPOSE) exec app /bin/sh From 77661fcac208eb33e310e85f6be3467a4a8d2cf2 Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sun, 21 Jun 2026 17:37:56 +0530 Subject: [PATCH 6/7] added init.py for tasks --- app/tasks/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 app/tasks/__init__.py diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 From 60e1ce5cbd51f8951ca135170a3da0877884b6d2 Mon Sep 17 00:00:00 2001 From: chetanr25 Date: Sun, 21 Jun 2026 17:40:23 +0530 Subject: [PATCH 7/7] added tests for celery workers --- tests/conftest.py | 2 +- tests/test_jobs.py | 111 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 tests/test_jobs.py diff --git a/tests/conftest.py b/tests/conftest.py index 56dea60..2699127 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from app.main import app from app.api.deps import get_db -from app.models import Template, FormSubmission # noqa: F401 — registers tables +from app.models import Template, FormSubmission, Job # noqa: F401 — registers tables # --------------------------------------------------------------------------- # In-memory database diff --git a/tests/test_jobs.py b/tests/test_jobs.py new file mode 100644 index 0000000..0355a7c --- /dev/null +++ b/tests/test_jobs.py @@ -0,0 +1,111 @@ +"""Tests for async job submission and status endpoints.""" + +from unittest.mock import patch, MagicMock + + +class TestJobEndpoints: + + def _seed_template(self, client): + resp = client.post("/templates/create", json={ + "name": "Test Template", + "pdf_path": "test.pdf", + "fields": {"name": "string"}, + }) + return resp.json()["id"] + + @patch("app.api.routes.jobs.fill_form_task") + def test_submit_async_single(self, mock_task, client): + mock_result = MagicMock() + mock_result.id = "celery-task-id-1" + mock_task.delay.return_value = mock_result + + tpl_id = self._seed_template(client) + resp = client.post("/forms/jobs", json={ + "template_ids": [tpl_id], + "input_text": "John Doe firefighter", + }) + assert resp.status_code == 200 + data = resp.json() + assert len(data["jobs"]) == 1 + assert data["jobs"][0]["status"] == "queued" + assert "job_id" in data["jobs"][0] + assert data["jobs"][0]["poll_url"].startswith("/api/v1/jobs/") + mock_task.delay.assert_called_once_with(tpl_id, "John Doe firefighter", None) + + @patch("app.api.routes.jobs.fill_form_task") + def test_submit_async_batch(self, mock_task, client): + mock_task.delay.side_effect = [ + MagicMock(id="task-1"), + MagicMock(id="task-2"), + ] + + t1 = self._seed_template(client) + t2 = self._seed_template(client) + resp = client.post("/forms/jobs", json={ + "template_ids": [t1, t2], + "input_text": "batch input", + }) + assert resp.status_code == 200 + jobs = resp.json()["jobs"] + assert len(jobs) == 2 + assert jobs[0]["job_id"] != jobs[1]["job_id"] + assert mock_task.delay.call_count == 2 + + @patch("app.api.routes.jobs.fill_form_task") + def test_submit_async_missing_template(self, mock_task, client): + resp = client.post("/forms/jobs", json={ + "template_ids": [9999], + "input_text": "some text", + }) + assert resp.status_code == 404 + mock_task.delay.assert_not_called() + + @patch("app.api.routes.jobs.fill_form_task") + def test_get_job_status(self, mock_task, client): + mock_task.delay.return_value = MagicMock(id="celery-abc") + + tpl_id = self._seed_template(client) + submit_resp = client.post("/forms/jobs", json={ + "template_ids": [tpl_id], + "input_text": "test input", + }) + job_id = submit_resp.json()["jobs"][0]["job_id"] + + resp = client.get(f"/jobs/{job_id}") + assert resp.status_code == 200 + data = resp.json() + assert data["job_id"] == job_id + assert data["job_type"] == "form_generation" + assert data["status"] == "queued" + assert data["progress_percent"] == 0 + + def test_get_job_not_found(self, client): + resp = client.get("/jobs/00000000-0000-0000-0000-000000000000") + assert resp.status_code == 404 + + @patch("app.api.routes.jobs.fill_form_task") + def test_submit_with_model_override(self, mock_task, client): + mock_task.delay.return_value = MagicMock(id="celery-xyz") + + tpl_id = self._seed_template(client) + resp = client.post("/forms/jobs", json={ + "template_ids": [tpl_id], + "input_text": "test", + "model": "mistral:latest", + }) + assert resp.status_code == 200 + mock_task.delay.assert_called_once_with(tpl_id, "test", "mistral:latest") + + def test_submit_empty_template_ids(self, client): + resp = client.post("/forms/jobs", json={ + "template_ids": [], + "input_text": "test", + }) + assert resp.status_code == 422 + + def test_submit_empty_input_text(self, client): + resp = client.post("/forms/jobs", json={ + "template_ids": [1], + "input_text": "", + }) + assert resp.status_code == 422