From 457f1978ef66724c967e20da8f1feef5d793a321 Mon Sep 17 00:00:00 2001 From: frozenhelium Date: Tue, 26 May 2026 11:38:22 +0545 Subject: [PATCH 1/2] feat(examples): add script to generate coco from csv --- ...rate_coco_with_annotations_from_dropbox.py | 526 ++++++++++++++++++ 1 file changed, 526 insertions(+) create mode 100644 examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py diff --git a/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py b/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py new file mode 100644 index 0000000..ca6115b --- /dev/null +++ b/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py @@ -0,0 +1,526 @@ +# /// script +# requires-python = ">=3.13" +# dependencies = [ +# "httpx~=0.28.1", +# "colorama", +# "pillow", +# ] +# /// +from __future__ import annotations + +import argparse +import csv +import io +import json +import os +import re +import sys +import textwrap +from collections import OrderedDict +from pathlib import PurePosixPath + +import httpx +from colorama import init as colorama_init, Fore + +colorama_init(autoreset=True) + + +ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"} + +DROPBOX_PERMISSION_MESSAGE = f""" +{Fore.YELLOW} +---------------------------------------------------- +Make sure the dropbox App includes these permissions +- files.metadata.read +- files.content.write +- files.content.read +- sharing.write +- sharing.read +""" + + +def die(msg: str, code: int = 2) -> None: + print(f"{Fore.RED}{msg}", file=sys.stderr) + sys.exit(code) + + +def dropbox_request_error_handler(res: httpx.Response) -> None: + try: + res.raise_for_status() + except httpx.HTTPStatusError as http_err: + print(f"{Fore.RED}HTTP error occurred while requesting {res.url}: {http_err}") + print(f"{Fore.RED}Response content: {res.text}") + print(DROPBOX_PERMISSION_MESSAGE) + raise + except httpx.RequestError as req_err: + print( + f"{Fore.RED}An error occurred while making the request to {res.url}: {req_err}" + ) + print(DROPBOX_PERMISSION_MESSAGE) + raise + + +def dropbox_rpc(endpoint: str, data: object, *, access_token: str) -> dict: + res = httpx.post( + f"https://api.dropboxapi.com/2/{endpoint}", + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + }, + data=json.dumps(data), + ) + dropbox_request_error_handler(res) + return res.json() + + +def dropbox_download(path: str, *, access_token: str) -> bytes: + res = httpx.post( + "https://content.dropboxapi.com/2/files/download", + headers={ + "Authorization": f"Bearer {access_token}", + "Dropbox-API-Arg": json.dumps({"path": path}), + }, + ) + dropbox_request_error_handler(res) + return res.content + + +def dropbox_upload(path: str, content: bytes, *, access_token: str) -> dict: + res = httpx.post( + "https://content.dropboxapi.com/2/files/upload", + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/octet-stream", + "Dropbox-API-Arg": json.dumps( + { + "path": path, + "mode": "overwrite", + "autorename": False, + "mute": False, + } + ), + }, + content=content, + ) + dropbox_request_error_handler(res) + return res.json() + + +def list_all_files( + folder_path: str, *, access_token: str, recursive: bool +) -> list[dict]: + entries: list[dict] = [] + response = dropbox_rpc( + "files/list_folder", + {"path": folder_path, "recursive": recursive}, + access_token=access_token, + ) + entries.extend(response.get("entries", [])) + while response.get("has_more", False): + response = dropbox_rpc( + "files/list_folder/continue", + {"cursor": response["cursor"]}, + access_token=access_token, + ) + entries.extend(response.get("entries", [])) + return entries + + +def collect_images(folder_path: str, *, access_token: str) -> list[dict]: + entries = list_all_files(folder_path, access_token=access_token, recursive=True) + images = [ + e + for e in entries + if e.get(".tag") == "file" + and PurePosixPath(e["name"]).suffix.lower() in ALLOWED_EXTENSIONS + ] + images.sort(key=lambda e: e["path_display"].lower()) + return images + + +def relative_under(path_display: str, base: str) -> str: + base_norm = base.rstrip("/").lower() + file_norm = path_display.lower() + prefix = base_norm + "/" + if file_norm.startswith(prefix): + return path_display[len(prefix):] + if file_norm == base_norm: + return "" + # Fallback — shouldn't happen given Dropbox listing semantics. + return path_display.lstrip("/") + + +def share_file_get_url(path: str, *, access_token: str) -> str: + res = dropbox_rpc( + "sharing/list_shared_links", + {"path": path, "direct_only": True}, + access_token=access_token, + ) + if res.get("links"): + link = res["links"][0]["url"] + else: + res_create = dropbox_rpc( + "sharing/create_shared_link_with_settings", + {"path": path, "settings": {"requested_visibility": "public"}}, + access_token=access_token, + ) + link = res_create["url"] + return re.sub(r"&dl=0\b", "", link) + "&raw=1" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Generate a COCO file with annotations from a Dropbox folder " + "containing a CSV metadata file and an images directory." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(DROPBOX_PERMISSION_MESSAGE), + ) + parser.add_argument( + "base_folder", + help='Dropbox path that contains the images folder and CSV. e.g. "/Indonesia"', + ) + parser.add_argument( + "--output", + default=None, + help="Local path to write the COCO JSON. Default: ./.coco.json", + ) + parser.add_argument( + "--images-folder", + default="images/", + help="Images folder, relative to base_folder (absolute Dropbox path also accepted). Default: images/", + ) + parser.add_argument( + "--csv-path", + default="metadata.csv", + help="CSV file path, relative to base_folder (absolute also accepted). Default: metadata.csv", + ) + parser.add_argument( + "--category-column", + default="type", + help="CSV column used to derive COCO categories. Default: type", + ) + parser.add_argument( + "--image-name-column", + default="image_name", + help="CSV column with the image path relative to --images-folder. Default: image_name", + ) + parser.add_argument( + "--boxes-column", + default="boxes", + help="CSV column with the bounding box (JSON-encoded 4-element list). Default: boxes", + ) + parser.add_argument( + "--box-format", + choices=["xyxy", "xywh"], + default="xyxy", + help="Format of values in --boxes-column. xyxy is converted to COCO xywh on output. Default: xyxy", + ) + parser.add_argument( + "--annotation-id-column", + default=None, + help="CSV column to use as annotation id (must be unique). Default: sequential 1..N", + ) + parser.add_argument( + "--image-id-column", + default=None, + help="CSV column to use as image id (must be consistent per image). Default: sequential 1..N", + ) + parser.add_argument( + "--include-empty-images", + action=argparse.BooleanOptionalAction, + default=True, + help="Include images in images[] even when they have no CSV row. Default: include", + ) + strictness = parser.add_mutually_exclusive_group() + strictness.add_argument( + "--strict", + dest="strict", + action="store_true", + help="(default) Hard-error on CSV rows referencing missing images.", + ) + strictness.add_argument( + "--lenient", + dest="strict", + action="store_false", + help="Skip-and-warn on CSV rows referencing missing images.", + ) + parser.set_defaults(strict=True) + parser.add_argument( + "--probe-dimensions", + action="store_true", + help="Download each image and probe width/height with Pillow.", + ) + parser.add_argument( + "--upload-to-dropbox", + action="store_true", + help="Also upload the COCO JSON to / in Dropbox.", + ) + return parser.parse_args() + + +def resolve_dbx_path(base_folder: str, sub: str) -> str: + # If sub is an absolute Dropbox path, return as-is; otherwise join under base. + if sub.startswith("/"): + return str(PurePosixPath(sub)) + return str(PurePosixPath(base_folder) / sub) + + +def coerce_id(raw: str) -> int | str: + raw = raw.strip() + if raw.lstrip("-").isdigit(): + return int(raw) + return raw + + +def main() -> None: + args = parse_args() + + access_token = os.environ.get("DROPBOX_ACCESS_TOKEN") + if not access_token: + die("DROPBOX_ACCESS_TOKEN env var is not set.") + + base_folder = args.base_folder.rstrip("/") or "/" + images_folder_dbx = resolve_dbx_path(base_folder, args.images_folder).rstrip("/") + csv_path_dbx = resolve_dbx_path(base_folder, args.csv_path) + + output_path = ( + args.output + or f"./{PurePosixPath(base_folder).name or 'coco'}.coco.json" + ) + + # 1. List images + print(f"Listing images under {images_folder_dbx} ...") + file_entries = collect_images(images_folder_dbx, access_token=access_token) + print(f" {len(file_entries)} image(s) found") + + image_by_relpath: dict[str, dict] = OrderedDict() + for entry in file_entries: + rel = relative_under(entry["path_display"], images_folder_dbx) + if not rel: + continue + if rel in image_by_relpath: + die(f"Duplicate relative path {rel!r} under {images_folder_dbx}.") + image_by_relpath[rel] = entry + + # 2. Download CSV + print(f"Downloading CSV from {csv_path_dbx} ...") + csv_bytes = dropbox_download(csv_path_dbx, access_token=access_token) + csv_reader = csv.DictReader(io.StringIO(csv_bytes.decode("utf-8-sig"))) + rows = list(csv_reader) + headers = list(csv_reader.fieldnames or []) + print(f" {len(rows)} CSV row(s); columns: {headers}") + + required_cols = { + args.image_name_column, + args.boxes_column, + args.category_column, + } + if args.annotation_id_column: + required_cols.add(args.annotation_id_column) + if args.image_id_column: + required_cols.add(args.image_id_column) + missing = required_cols - set(headers) + if missing: + die(f"CSV is missing required columns: {sorted(missing)}") + + # 3. Validate image references + unmatched: list[tuple[int, str]] = [] + for row_idx, row in enumerate(rows, start=1): + name = (row.get(args.image_name_column) or "").strip() + if name not in image_by_relpath: + unmatched.append((row_idx, name)) + if unmatched: + preview = "\n".join(f" row {idx}: {name!r}" for idx, name in unmatched[:20]) + more = ( + f"\n ... ({len(unmatched) - 20} more)" if len(unmatched) > 20 else "" + ) + msg = ( + f"{len(unmatched)} CSV row(s) reference images not found under " + f"{images_folder_dbx}:\n{preview}{more}" + ) + if args.strict: + die( + f"{msg}\nAborting (--strict). Re-run with --lenient to skip these rows." + ) + print(f"{Fore.YELLOW}WARNING: {msg}") + skip = {n for _, n in unmatched} + rows = [ + r for r in rows if (r.get(args.image_name_column) or "").strip() not in skip + ] + + # 4. Build categories[] (id by order of first appearance). + # Empty values are routed to an "unknown" category, added on first occurrence. + UNKNOWN_NAME = "unknown" + categories: list[dict] = [] + cat_id_by_name: dict[str, int] = {} + unknown_count = 0 + for row in rows: + raw = (row.get(args.category_column) or "").strip() + name = raw or UNKNOWN_NAME + if not raw: + unknown_count += 1 + if name not in cat_id_by_name: + cat_id_by_name[name] = len(categories) + 1 + categories.append({"id": cat_id_by_name[name], "name": name}) + if unknown_count: + print( + f"{Fore.YELLOW}WARNING: {unknown_count} row(s) had empty " + f"{args.category_column!r}; routed to category {UNKNOWN_NAME!r}." + ) + + # 6. Assign image ids + rows_by_image: dict[str, list[dict]] = {} + for row in rows: + rows_by_image.setdefault(row[args.image_name_column].strip(), []).append(row) + + image_id_by_relpath: dict[str, int | str] = {} + if args.image_id_column: + for relpath, row_list in rows_by_image.items(): + ids = {r[args.image_id_column].strip() for r in row_list} + if len(ids) > 1: + die( + f"Image {relpath!r} has inconsistent values in " + f"{args.image_id_column!r}: {sorted(ids)}" + ) + image_id_by_relpath[relpath] = coerce_id(next(iter(ids))) + used_int = [v for v in image_id_by_relpath.values() if isinstance(v, int)] + next_id = (max(used_int) + 1) if used_int else 1 + for relpath in image_by_relpath: + if relpath not in image_id_by_relpath: + image_id_by_relpath[relpath] = next_id + next_id += 1 + else: + for idx, relpath in enumerate(image_by_relpath.keys(), start=1): + image_id_by_relpath[relpath] = idx + + # 7. Drop empty images if not included + if not args.include_empty_images: + kept = {r[args.image_name_column].strip() for r in rows} + image_by_relpath = OrderedDict( + (rp, e) for rp, e in image_by_relpath.items() if rp in kept + ) + + # 8. Share links + optional dimension probe + images_out: list[dict] = [] + total = len(image_by_relpath) + for i, (relpath, entry) in enumerate(image_by_relpath.items(), start=1): + print(f"{i}/{total} sharing {relpath}") + url = share_file_get_url(entry["path_lower"], access_token=access_token) + record: dict = { + "id": image_id_by_relpath[relpath], + "file_name": relpath, + "coco_url": url, + } + if args.probe_dimensions: + from PIL import Image # lazy import + + img_bytes = dropbox_download(entry["path_lower"], access_token=access_token) + with Image.open(io.BytesIO(img_bytes)) as im: + record["width"], record["height"] = im.size + images_out.append(record) + + # 9. Build annotations[] + attribute_columns = [ + c + for c in headers + if c + not in { + args.image_name_column, + args.boxes_column, + args.category_column, + args.annotation_id_column, + args.image_id_column, + } + ] + annotations_out: list[dict] = [] + seen_ann_ids: set = set() + + for row_idx, row in enumerate(rows, start=1): + relpath = row[args.image_name_column].strip() + image_id = image_id_by_relpath[relpath] + + raw_box = row[args.boxes_column] + try: + box = json.loads(raw_box) + except (json.JSONDecodeError, TypeError) as exc: + die( + f"Row {row_idx}: failed to parse {args.boxes_column!r} as JSON: {exc}" + ) + if not isinstance(box, list) or len(box) != 4: + die( + f"Row {row_idx}: {args.boxes_column!r} is not a 4-element list: " + f"{box!r}" + ) + if args.box_format == "xyxy": + x1, y1, x2, y2 = box + bbox = [x1, y1, x2 - x1, y2 - y1] + else: + bbox = list(box) + + if args.annotation_id_column: + ann_id = coerce_id(row[args.annotation_id_column]) + if ann_id in seen_ann_ids: + die(f"Row {row_idx}: duplicate annotation id {ann_id!r}") + seen_ann_ids.add(ann_id) + else: + ann_id = len(annotations_out) + 1 + + cat_name = (row.get(args.category_column) or "").strip() or UNKNOWN_NAME + annotations_out.append( + { + "id": ann_id, + "image_id": image_id, + "bbox": bbox, + "category_id": cat_id_by_name[cat_name], + "attributes": {col: row[col] for col in attribute_columns}, + } + ) + + coco = { + "images": images_out, + "annotations": annotations_out, + "categories": categories, + } + + # 10. Write locally + with open(output_path, "w") as f: + json.dump(coco, f, indent=2) + print(f"{Fore.GREEN}Wrote {output_path}") + + # 11. Optional Dropbox upload + dropbox_link: str | None = None + if args.upload_to_dropbox: + out_basename = PurePosixPath(output_path).name + dest = str(PurePosixPath(base_folder) / out_basename) + print(f"Uploading COCO file to {dest} ...") + dropbox_upload( + dest, json.dumps(coco).encode("utf-8"), access_token=access_token + ) + link_res = dropbox_rpc( + "files/get_temporary_link", {"path": dest}, access_token=access_token + ) + dropbox_link = link_res["link"] + print(f"{Fore.GREEN}Uploaded to {dest}") + print(f"{Fore.GREEN}Temporary link: {dropbox_link}") + + # 12. Summary + images_with_detections = len( + {r[args.image_name_column].strip() for r in rows} + & set(image_by_relpath.keys()) + ) + print() + print(f"{Fore.CYAN}Summary:") + print(f" Categories: {len(categories)}") + print( + f" Images: {len(images_out)} ({images_with_detections} with detections)" + ) + print(f" Annotations: {len(annotations_out)}") + print(f" Output: {output_path}") + if dropbox_link: + print(f" Dropbox: {dropbox_link}") + + +if __name__ == "__main__": + main() From bf630602d3f0037ada1c2fd28814342409730576 Mon Sep 17 00:00:00 2001 From: tnagorra Date: Wed, 27 May 2026 16:40:10 +0545 Subject: [PATCH 2/2] refactor(examples): update script for annotation based coco generation --- examples/generate-coco-from-dropbox/README.md | 95 +- ...rate_coco_with_annotations_from_dropbox.py | 1230 ++++++++++++----- 2 files changed, 978 insertions(+), 347 deletions(-) diff --git a/examples/generate-coco-from-dropbox/README.md b/examples/generate-coco-from-dropbox/README.md index 23d898e..a0382d8 100644 --- a/examples/generate-coco-from-dropbox/README.md +++ b/examples/generate-coco-from-dropbox/README.md @@ -9,16 +9,19 @@ permalink: /examples/generate-coco-from-dropbox/ ## Background -[Assess Images](../../docs/project_types/assess_images.md) projects are created from a COCO-format JSON file describing the images to be mapped. This page provides a Python script that produces a minimal COCO file (`{ "images": [...] }`) from a folder of images hosted on Dropbox, so they can be referenced by public URL. The script also uploads the resulting JSON back to the same Dropbox folder. +[Assess Images](../../docs/project_types/assess_images.md) projects are created from a COCO-format JSON file describing the images to be mapped. This page provides two Python scripts that generate such a file from a Dropbox folder, exposing image URLs as public Dropbox share links. -> [!CAUTION] -> Ongoing updates to MapSwipe and Dropbox may render this script **out-of-date**. +- **v1** — `generate_coco_from_dropbox.py`: builds a minimal COCO file (`{ "images": [...] }`) from a **flat folder of images**. No annotations, no categories. Useful when you only need to register images for mapping. +- **v2** — `generate_coco_with_annotations_from_dropbox.py`: builds a full COCO file (`images`, `annotations`, `categories`) from an **images folder paired with a CSV metadata file** that describes bounding-box annotations per image. Useful when you already have annotations and want to seed an Assess Images project with them. -Utility script: [`generate_coco_from_dropbox.py`](generate_coco_from_dropbox.py) +> [!CAUTION] +> Ongoing updates to MapSwipe and Dropbox may render these scripts **out-of-date**. > For the Google Drive equivalent see [Generate COCO File from Google Drive](../generate-coco-from-drive/README.md). -## Prerequisites +## Common Prerequisites + +These apply to both scripts. - A Dropbox account: . - A new Dropbox app: . @@ -33,9 +36,18 @@ Utility script: [`generate_coco_from_dropbox.py`](generate_coco_from_dropbox.py) - `sharing.read` - A generated access token (from the app settings → **Generated access token**). - [uv](https://docs.astral.sh/uv/getting-started/installation/) installed. -- A Dropbox folder containing the images to be exported. -## Creation Steps +## v1 — From a Flat Image Directory + +Utility script: [`generate_coco_from_dropbox.py`](generate_coco_from_dropbox.py) + +Produces a minimal COCO file containing only an `images` array. Each image is given a public Dropbox share link as its `coco_url`. The resulting JSON is uploaded back into the same Dropbox folder. + +### Additional Prerequisites + +- A Dropbox folder containing the images to be exported (`.jpg`, `.jpeg`, `.png`, or `.webp`). The folder is listed **non-recursively** — only files in the top level are picked up. + +### Creation Steps 1. Copy the folder path in Dropbox. 2. Copy the generated access token from Dropbox. @@ -52,3 +64,72 @@ Utility script: [`generate_coco_from_dropbox.py`](generate_coco_from_dropbox.py) uv run generate_coco_from_dropbox.py sl.yourAccessTokenHere "/COCO TEST" "coco_export.json" ``` 4. Download the exported COCO JSON from the link printed in the terminal, or directly from your Dropbox folder. + +## v2 — From an Images Directory with a Metadata CSV + +Utility script: [`generate_coco_with_annotations_from_dropbox.py`](generate_coco_with_annotations_from_dropbox.py) + +Produces a full COCO file (`images`, `annotations`, `categories`) by combining an images sub-folder with a CSV metadata file that describes one bounding-box annotation per row. Image URLs are resolved to public Dropbox share links. The output is written locally; uploading back to Dropbox is opt-in. + +### Additional Prerequisites + +- A Dropbox folder containing: + - An images sub-folder (default: `images/`) with `.jpg`, `.jpeg`, `.png`, or `.webp` files. The folder is listed **recursively**. + - A CSV metadata file (default: `metadata.csv`) with at least these columns: + - `image_name` — image path relative to the images folder. + - `boxes` — JSON-encoded 4-element list (e.g. `[x1, y1, x2, y2]`). + - `type` — category name for the annotation. + + Additional columns are preserved as per-annotation `attributes`. All column names are configurable via flags. + +### Creation Steps + +1. Copy the folder path in Dropbox (the one containing the images sub-folder and the CSV). +2. Copy the generated access token from Dropbox and export it: + + ```bash + export DROPBOX_ACCESS_TOKEN="sl.yourAccessTokenHere" + ``` +3. Run the script: + + ```bash + # Help + uv run generate_coco_with_annotations_from_dropbox.py --help + + # Usage + uv run generate_coco_with_annotations_from_dropbox.py "FOLDER_PATHNAME_IN_DROPBOX" [options] + + # Example + uv run generate_coco_with_annotations_from_dropbox.py "/Indonesia" + + # Example with custom columns, lenient mode, image-dimension probing, and upload + uv run generate_coco_with_annotations_from_dropbox.py "/Indonesia" \ + --output ./indonesia.coco.json \ + --images-path images/ \ + --metadata-csv-path metadata.csv \ + --category-column type \ + --image-name-column image_name \ + --boxes-column boxes \ + --box-format xyxy \ + --lenient \ + --probe-dimensions \ + --upload-to-dropbox + ``` +4. Download the exported COCO JSON from the local path printed in the terminal (default: `./.coco.json`). If `--upload-to-dropbox` was used, a temporary Dropbox link is also printed. + +### Options + +| Flag | Default | Purpose | +| --- | --- | --- | +| `--output` | `./.coco.json` | Local path to write the COCO JSON. | +| `--images-path` | `images/` | Images folder, relative to the base folder (absolute Dropbox paths also accepted). | +| `--metadata-csv-path` | `metadata.csv` | CSV file path, relative to the base folder (absolute also accepted). | +| `--category-column` | `type` | CSV column used to derive COCO categories. | +| `--image-name-column` | `image_name` | CSV column with the image path relative to `--images-path`. | +| `--boxes-column` | `boxes` | CSV column with the bounding box (JSON-encoded 4-element list). | +| `--box-format` | `xyxy` | Format of the values in `--boxes-column`. `xyxy` is converted to COCO `xywh` on output. | +| `--annotation-id-column` | _(unset)_ | CSV column to use as annotation id (must be unique). Defaults to sequential 1..N. | +| `--image-id-column` | _(unset)_ | CSV column to use as image id (must be consistent per image). Defaults to sequential 1..N. | +| `--strict` / `--lenient` | `--strict` | Hard-error vs. skip-and-warn on rows referencing missing images or invalid boxes. | +| `--probe-dimensions` | `false` | Download each image and probe `width`/`height` with Pillow. | +| `--upload-to-dropbox` | `false` | Also upload the COCO JSON to `/` in Dropbox. | diff --git a/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py b/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py index ca6115b..83710c9 100644 --- a/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py +++ b/examples/generate-coco-from-dropbox/generate_coco_with_annotations_from_dropbox.py @@ -6,26 +6,29 @@ # "pillow", # ] # /// -from __future__ import annotations - import argparse import csv import io import json import os -import re import sys import textwrap from collections import OrderedDict -from pathlib import PurePosixPath +from collections.abc import Callable, Iterable +from dataclasses import dataclass, field +from pathlib import Path, PurePosixPath +from typing import Any, Literal, NoReturn, Self, TypedDict, cast +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit import httpx -from colorama import init as colorama_init, Fore +from colorama import Fore +from colorama import init as colorama_init +from PIL import Image colorama_init(autoreset=True) - ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"} +UNKNOWN_CATEGORY = "unknown" DROPBOX_PERMISSION_MESSAGE = f""" {Fore.YELLOW} @@ -39,136 +42,329 @@ """ -def die(msg: str, code: int = 2) -> None: +# --------------------------------------------------------------------------- +# Dropbox API: typed request/response shapes +# --------------------------------------------------------------------------- + +# ".tag" isn't a valid Python identifier, so use the functional TypedDict form. +# We only model the file-entry shape: callers must filter ".tag" == "file" +# before accessing the path/name fields. +FileEntry = TypedDict( + "FileEntry", + { + ".tag": Literal["file", "folder", "deleted"], + "name": str, + "path_display": str, + "path_lower": str, + }, +) + + +class ListFolderResponse(TypedDict): + entries: list[FileEntry] + cursor: str + has_more: bool + + +class SharedLink(TypedDict): + url: str + + +class ListSharedLinksResponse(TypedDict): + links: list[SharedLink] + + +class CreateSharedLinkResponse(TypedDict): + url: str + + +class TemporaryLinkResponse(TypedDict): + link: str + + +class UploadResponse(TypedDict, total=False): + id: str + name: str + path_display: str + + +# --------------------------------------------------------------------------- +# COCO output shapes +# --------------------------------------------------------------------------- + + +type CocoId = int | str + + +class CocoCategory(TypedDict): + id: int + name: str + + +class _CocoImageBase(TypedDict): + id: CocoId + file_name: str + coco_url: str + + +class CocoImage(_CocoImageBase, total=False): + width: int + height: int + + +class CocoAnnotation(TypedDict): + id: CocoId + image_id: CocoId + bbox: list[float] + category_id: int + attributes: dict[str, str] + + +class CocoDocument(TypedDict): + images: list[CocoImage] + annotations: list[CocoAnnotation] + categories: list[CocoCategory] + + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + + +def die(msg: str, code: int = 2) -> NoReturn: print(f"{Fore.RED}{msg}", file=sys.stderr) sys.exit(code) -def dropbox_request_error_handler(res: httpx.Response) -> None: - try: - res.raise_for_status() - except httpx.HTTPStatusError as http_err: - print(f"{Fore.RED}HTTP error occurred while requesting {res.url}: {http_err}") - print(f"{Fore.RED}Response content: {res.text}") - print(DROPBOX_PERMISSION_MESSAGE) - raise - except httpx.RequestError as req_err: - print( - f"{Fore.RED}An error occurred while making the request to {res.url}: {req_err}" - ) - print(DROPBOX_PERMISSION_MESSAGE) - raise - - -def dropbox_rpc(endpoint: str, data: object, *, access_token: str) -> dict: - res = httpx.post( - f"https://api.dropboxapi.com/2/{endpoint}", - headers={ - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json", - }, - data=json.dumps(data), - ) - dropbox_request_error_handler(res) - return res.json() +# --------------------------------------------------------------------------- +# Dropbox client +# --------------------------------------------------------------------------- -def dropbox_download(path: str, *, access_token: str) -> bytes: - res = httpx.post( - "https://content.dropboxapi.com/2/files/download", - headers={ - "Authorization": f"Bearer {access_token}", - "Dropbox-API-Arg": json.dumps({"path": path}), - }, - ) - dropbox_request_error_handler(res) - return res.content +class DropboxClient: + """Thin Dropbox v2 client. Reuses one HTTP connection pool.""" + API = "https://api.dropboxapi.com/2" + CONTENT = "https://content.dropboxapi.com/2" -def dropbox_upload(path: str, content: bytes, *, access_token: str) -> dict: - res = httpx.post( - "https://content.dropboxapi.com/2/files/upload", - headers={ - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/octet-stream", - "Dropbox-API-Arg": json.dumps( - { - "path": path, - "mode": "overwrite", - "autorename": False, - "mute": False, - } + def __init__(self, access_token: str) -> None: + self._token = access_token + self._client = httpx.Client(timeout=httpx.Timeout(60.0)) + + def close(self) -> None: + self._client.close() + + def __enter__(self) -> Self: + return self + + def __exit__(self, *_: object) -> None: + self.close() + + def _raise(self, res: httpx.Response) -> None: + try: + res.raise_for_status() + except httpx.HTTPStatusError as err: + request = res.request + body = ( + request.content.decode("utf-8", errors="replace") + if request.content + else "" + ) + api_arg = request.headers.get("Dropbox-API-Arg", "") + print(f"{Fore.RED}HTTP error occurred while requesting {res.url}: {err}") + if body: + print(f"{Fore.RED}Request body: {body}") + if api_arg: + print(f"{Fore.RED}Dropbox-API-Arg: {api_arg}") + print(f"{Fore.RED}Response content: {res.text}") + print(DROPBOX_PERMISSION_MESSAGE) + raise + except httpx.RequestError as err: + print( + f"{Fore.RED}An error occurred while making the request to " + f"{res.url}: {err}" + ) + print(DROPBOX_PERMISSION_MESSAGE) + raise + + def _post(self, endpoint: str, data: object) -> dict: + res = self._client.post( + f"{self.API}/{endpoint}", + headers={ + "Authorization": f"Bearer {self._token}", + "Content-Type": "application/json", + }, + content=json.dumps(data), + ) + self._raise(res) + return res.json() + + # -- typed wrappers -- + + def _list_folder( + self, path: PurePosixPath, *, recursive: bool + ) -> ListFolderResponse: + return cast( + ListFolderResponse, + self._post( + "files/list_folder", + {"path": str(path), "recursive": recursive}, ), - }, - content=content, - ) - dropbox_request_error_handler(res) - return res.json() - - -def list_all_files( - folder_path: str, *, access_token: str, recursive: bool -) -> list[dict]: - entries: list[dict] = [] - response = dropbox_rpc( - "files/list_folder", - {"path": folder_path, "recursive": recursive}, - access_token=access_token, - ) - entries.extend(response.get("entries", [])) - while response.get("has_more", False): - response = dropbox_rpc( - "files/list_folder/continue", - {"cursor": response["cursor"]}, - access_token=access_token, ) + + def _list_folder_continue(self, cursor: str) -> ListFolderResponse: + return cast( + ListFolderResponse, + self._post("files/list_folder/continue", {"cursor": cursor}), + ) + + def list_all_files( + self, + folder: PurePosixPath, + *, + recursive: bool, + sort_key: Callable[[FileEntry], Any] | None = None, + ) -> list[FileEntry]: + entries: list[FileEntry] = [] + response = self._list_folder(folder, recursive=recursive) entries.extend(response.get("entries", [])) - return entries - - -def collect_images(folder_path: str, *, access_token: str) -> list[dict]: - entries = list_all_files(folder_path, access_token=access_token, recursive=True) - images = [ - e - for e in entries - if e.get(".tag") == "file" - and PurePosixPath(e["name"]).suffix.lower() in ALLOWED_EXTENSIONS - ] - images.sort(key=lambda e: e["path_display"].lower()) - return images - - -def relative_under(path_display: str, base: str) -> str: - base_norm = base.rstrip("/").lower() - file_norm = path_display.lower() - prefix = base_norm + "/" - if file_norm.startswith(prefix): - return path_display[len(prefix):] - if file_norm == base_norm: - return "" - # Fallback — shouldn't happen given Dropbox listing semantics. - return path_display.lstrip("/") - - -def share_file_get_url(path: str, *, access_token: str) -> str: - res = dropbox_rpc( - "sharing/list_shared_links", - {"path": path, "direct_only": True}, - access_token=access_token, - ) - if res.get("links"): - link = res["links"][0]["url"] - else: - res_create = dropbox_rpc( - "sharing/create_shared_link_with_settings", - {"path": path, "settings": {"requested_visibility": "public"}}, - access_token=access_token, + while response.get("has_more", False): + response = self._list_folder_continue(response["cursor"]) + entries.extend(response.get("entries", [])) + if sort_key is not None: + entries.sort(key=sort_key) + return entries + + def list_all_images( + self, + folder: PurePosixPath, + *, + recursive: bool = True, + sort_key: Callable[[FileEntry], Any] | None = None, + ) -> list[FileEntry]: + entries = self.list_all_files(folder, recursive=recursive) + images = [ + e + for e in entries + if e.get(".tag") == "file" + and PurePosixPath(e["name"]).suffix.lower() in ALLOWED_EXTENSIONS + ] + if sort_key is not None: + images.sort(key=sort_key) + return images + + def list_shared_links(self, path: PurePosixPath) -> ListSharedLinksResponse: + return cast( + ListSharedLinksResponse, + self._post( + "sharing/list_shared_links", + {"path": str(path), "direct_only": True}, + ), + ) + + def create_shared_link(self, path: PurePosixPath) -> CreateSharedLinkResponse: + return cast( + CreateSharedLinkResponse, + self._post( + "sharing/create_shared_link_with_settings", + {"path": str(path), "settings": {"requested_visibility": "public"}}, + ), + ) + + def get_temporary_link(self, path: PurePosixPath) -> TemporaryLinkResponse: + return cast( + TemporaryLinkResponse, + self._post("files/get_temporary_link", {"path": str(path)}), + ) + + def download(self, path: PurePosixPath) -> bytes: + res = self._client.post( + f"{self.CONTENT}/files/download", + headers={ + "Authorization": f"Bearer {self._token}", + "Dropbox-API-Arg": json.dumps({"path": str(path)}), + }, ) - link = res_create["url"] - return re.sub(r"&dl=0\b", "", link) + "&raw=1" + self._raise(res) + return res.content + + def upload(self, path: PurePosixPath, content: bytes) -> UploadResponse: + res = self._client.post( + f"{self.CONTENT}/files/upload", + headers={ + "Authorization": f"Bearer {self._token}", + "Content-Type": "application/octet-stream", + "Dropbox-API-Arg": json.dumps( + { + "path": str(path), + "mode": "overwrite", + "autorename": False, + "mute": False, + } + ), + }, + content=content, + ) + self._raise(res) + return cast(UploadResponse, res.json()) + + @staticmethod + def to_raw_url(url: str) -> str: + """Convert a Dropbox preview URL to a direct-content URL (raw=1).""" + parts = urlsplit(url) + query = [ + (k, v) + for k, v in parse_qsl(parts.query, keep_blank_values=True) + if k != "dl" + ] + query.append(("raw", "1")) + return urlunsplit(parts._replace(query=urlencode(query))) + def get_or_create_shareable_url(self, path: PurePosixPath) -> str: + """Return a raw shared URL for `path`, creating the share link if needed.""" + res = self.list_shared_links(path) + links = res.get("links") or [] + if links: + url = links[0]["url"] + else: + url = self.create_shared_link(path)["url"] + return self.to_raw_url(url) + + +# --------------------------------------------------------------------------- +# Path helpers +# --------------------------------------------------------------------------- + + +def resolve_path(base_folder: PurePosixPath, sub: str) -> PurePosixPath: + """Join `sub` under `base_folder`, unless `sub` is already absolute.""" + p = PurePosixPath(sub) + return p if p.is_absolute() else base_folder / sub + + +# --------------------------------------------------------------------------- +# Args +# --------------------------------------------------------------------------- -def parse_args() -> argparse.Namespace: + +@dataclass(frozen=True) +class Args: + base_folder: PurePosixPath + output_path: Path + images_path_dropbox: PurePosixPath + metadata_csv_path_dropbox: PurePosixPath + category_column: str + image_name_column: str + boxes_column: str + box_format: Literal["xyxy", "xywh"] + annotation_id_column: str | None + image_id_column: str | None + is_strict: bool + should_probe_dimensions: bool + should_upload_to_dropbox: bool + + +def parse_args() -> Args: parser = argparse.ArgumentParser( description=( "Generate a COCO file with annotations from a Dropbox folder " @@ -187,12 +383,12 @@ def parse_args() -> argparse.Namespace: help="Local path to write the COCO JSON. Default: ./.coco.json", ) parser.add_argument( - "--images-folder", + "--images-path", default="images/", help="Images folder, relative to base_folder (absolute Dropbox path also accepted). Default: images/", ) parser.add_argument( - "--csv-path", + "--metadata-csv-path", default="metadata.csv", help="CSV file path, relative to base_folder (absolute also accepted). Default: metadata.csv", ) @@ -204,7 +400,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--image-name-column", default="image_name", - help="CSV column with the image path relative to --images-folder. Default: image_name", + help="CSV column with the image path relative to --images-path. Default: image_name", ) parser.add_argument( "--boxes-column", @@ -227,12 +423,6 @@ def parse_args() -> argparse.Namespace: default=None, help="CSV column to use as image id (must be consistent per image). Default: sequential 1..N", ) - parser.add_argument( - "--include-empty-images", - action=argparse.BooleanOptionalAction, - default=True, - help="Include images in images[] even when they have no CSV row. Default: include", - ) strictness = parser.add_mutually_exclusive_group() strictness.add_argument( "--strict", @@ -257,270 +447,630 @@ def parse_args() -> argparse.Namespace: action="store_true", help="Also upload the COCO JSON to / in Dropbox.", ) - return parser.parse_args() + ns = parser.parse_args() + base_folder = PurePosixPath(ns.base_folder.rstrip("/") or "/") + images_path_dropbox = resolve_path(base_folder, ns.images_path) + metadata_csv_path_dropbox = resolve_path(base_folder, ns.metadata_csv_path) + output_path = Path( + ns.output or f"./{base_folder.name or 'untitled'}.coco.json" + ) -def resolve_dbx_path(base_folder: str, sub: str) -> str: - # If sub is an absolute Dropbox path, return as-is; otherwise join under base. - if sub.startswith("/"): - return str(PurePosixPath(sub)) - return str(PurePosixPath(base_folder) / sub) + return Args( + base_folder=base_folder, + output_path=output_path, + images_path_dropbox=images_path_dropbox, + metadata_csv_path_dropbox=metadata_csv_path_dropbox, + category_column=ns.category_column, + image_name_column=ns.image_name_column, + boxes_column=ns.boxes_column, + box_format=ns.box_format, + annotation_id_column=ns.annotation_id_column, + image_id_column=ns.image_id_column, + is_strict=ns.strict, + should_probe_dimensions=ns.probe_dimensions, + should_upload_to_dropbox=ns.upload_to_dropbox, + ) -def coerce_id(raw: str) -> int | str: +# --------------------------------------------------------------------------- +# Dropbox-side loaders / writers +# --------------------------------------------------------------------------- + + +def list_dropbox_image_index( + client: DropboxClient, + folder: PurePosixPath, + base_folder: PurePosixPath, +) -> dict[PurePosixPath, FileEntry]: + """List images under `folder`, keyed by their path relative to `base_folder`.""" + print(f"Listing images under {folder} ...") + file_entries = client.list_all_images( + folder, sort_key=lambda e: e["path_display"].lower() + ) + print(f" {len(file_entries)} image(s) found") + + base_depth = len(base_folder.parts) + index: dict[PurePosixPath, FileEntry] = OrderedDict() + for entry in file_entries: + rel = PurePosixPath( + *PurePosixPath(entry["path_display"]).parts[base_depth:] + ) + if rel in index: + die(f"Duplicate relative path {str(rel)!r} under {base_folder}.") + index[rel] = entry + return index + + +def load_dropbox_csv_rows( + client: DropboxClient, csv_path: PurePosixPath +) -> tuple[list[dict[str, str]], list[str]]: + """Download a CSV from Dropbox and parse it into row dicts + headers. + + Every field is normalised to a stripped string — never None — so callers + can use plain `row[col]` access without further defensive shaping. + """ + print(f"Downloading CSV from {csv_path} ...") + csv_bytes = client.download(csv_path) + reader = csv.DictReader(io.StringIO(csv_bytes.decode("utf-8-sig"))) + rows = [{k: (v or "").strip() for k, v in row.items()} for row in reader] + headers = list(reader.fieldnames or []) + print(f" {len(rows)} CSV row(s); columns: {headers}") + return rows, headers + + +def write_output(coco: CocoDocument, output_path: Path) -> None: + with output_path.open("w") as f: + json.dump(coco, f, indent=2) + print(f"{Fore.GREEN}Wrote {output_path}") + + +def upload_coco_to_dropbox( + client: DropboxClient, + coco: CocoDocument, + base_folder: PurePosixPath, + output_path: Path, +) -> str: + dest = base_folder / output_path.name + print(f"Uploading COCO file to {dest} ...") + client.upload(dest, json.dumps(coco).encode("utf-8")) + link = client.get_temporary_link(dest)["link"] + print(f"{Fore.GREEN}Uploaded to {dest}") + print(f"{Fore.GREEN}Temporary link: {link}") + return link + + +# --------------------------------------------------------------------------- +# CocoBuilder (pure: no CSV / Dropbox knowledge) +# --------------------------------------------------------------------------- + + +def coerce_id(raw: str) -> CocoId: + # Only integer-looking strings coerce; "1.0" stays as the string "1.0". raw = raw.strip() if raw.lstrip("-").isdigit(): return int(raw) return raw -def main() -> None: - args = parse_args() - - access_token = os.environ.get("DROPBOX_ACCESS_TOKEN") - if not access_token: - die("DROPBOX_ACCESS_TOKEN env var is not set.") - - base_folder = args.base_folder.rstrip("/") or "/" - images_folder_dbx = resolve_dbx_path(base_folder, args.images_folder).rstrip("/") - csv_path_dbx = resolve_dbx_path(base_folder, args.csv_path) +@dataclass(frozen=True) +class BuilderOptions: + """Column mappings and output toggles consumed by CocoBuilder.""" - output_path = ( - args.output - or f"./{PurePosixPath(base_folder).name or 'coco'}.coco.json" - ) + category_column: str + image_name_column: str + boxes_column: str + box_format: Literal["xyxy", "xywh"] + annotation_id_column: str | None + image_id_column: str | None + is_strict: bool - # 1. List images - print(f"Listing images under {images_folder_dbx} ...") - file_entries = collect_images(images_folder_dbx, access_token=access_token) - print(f" {len(file_entries)} image(s) found") + @classmethod + def from_args(cls, args: Args) -> Self: + return cls( + category_column=args.category_column, + image_name_column=args.image_name_column, + boxes_column=args.boxes_column, + box_format=args.box_format, + annotation_id_column=args.annotation_id_column, + image_id_column=args.image_id_column, + is_strict=args.is_strict, + ) - image_by_relpath: dict[str, dict] = OrderedDict() - for entry in file_entries: - rel = relative_under(entry["path_display"], images_folder_dbx) - if not rel: - continue - if rel in image_by_relpath: - die(f"Duplicate relative path {rel!r} under {images_folder_dbx}.") - image_by_relpath[rel] = entry - - # 2. Download CSV - print(f"Downloading CSV from {csv_path_dbx} ...") - csv_bytes = dropbox_download(csv_path_dbx, access_token=access_token) - csv_reader = csv.DictReader(io.StringIO(csv_bytes.decode("utf-8-sig"))) - rows = list(csv_reader) - headers = list(csv_reader.fieldnames or []) - print(f" {len(rows)} CSV row(s); columns: {headers}") - required_cols = { +def ensure_required_fields(fields: list[str], args: Args) -> None: + """Hard-error if `fields` is missing any field referenced by `args`.""" + required = { args.image_name_column, args.boxes_column, args.category_column, } if args.annotation_id_column: - required_cols.add(args.annotation_id_column) + required.add(args.annotation_id_column) if args.image_id_column: - required_cols.add(args.image_id_column) - missing = required_cols - set(headers) + required.add(args.image_id_column) + missing = required - set(fields) if missing: - die(f"CSV is missing required columns: {sorted(missing)}") - - # 3. Validate image references - unmatched: list[tuple[int, str]] = [] - for row_idx, row in enumerate(rows, start=1): - name = (row.get(args.image_name_column) or "").strip() - if name not in image_by_relpath: - unmatched.append((row_idx, name)) - if unmatched: - preview = "\n".join(f" row {idx}: {name!r}" for idx, name in unmatched[:20]) + die(f"Input is missing required fields: {sorted(missing)}") + + +@dataclass +class _Item: + """An input row plus per-item state set during validation.""" + + fields: dict[str, str] + skipped: bool = False + # COCO-format (xywh) bbox; populated by `validate_bboxes` for non-skipped items. + parsed_bbox: list[float] = field(default_factory=list) + # Populated by `validate_annotation_ids` for non-skipped items. + annotation_id: CocoId | None = None + + +class CocoBuilder: + """Build a COCO document from in-memory items + an image relative_path set. + + The builder is agnostic to data sources. Callers provide: + - `items` / `fields`: item dicts and the set of field names (typically + CSV rows + headers, but any dict-shape works) + - `image_relative_paths`: relative paths of known images (typically from + a listing) + - `resolve_image_url(relative_path) -> str`: returns the COCO url for an + image + - `probe_image_dimensions(relative_path) -> (w, h)`: optional; enables + width/height + - `image_source_label`: human-readable label for error messages + """ + + def __init__( + self, + *, + options: BuilderOptions, + items: list[dict[str, str]], + fields: list[str], + image_relative_paths: Iterable[PurePosixPath], + resolve_image_url: Callable[[PurePosixPath], str], + probe_image_dimensions: Callable[[PurePosixPath], tuple[int, int]] + | None = None, + image_source_label: str, + ) -> None: + self._options = options + # Items keep their original 1-based position; the `skipped` flag is + # set during validation, so item_idx in errors always refers to the + # original input position. + self._items: list[_Item] = [_Item(fields=item) for item in items] + self._fields = list(fields) + # Dict-as-ordered-set: keeps listing order, gives O(1) membership. + self._image_relative_paths: dict[PurePosixPath, None] = dict.fromkeys( + image_relative_paths + ) + self._resolve_image_url = resolve_image_url + self._probe_image_dimensions = probe_image_dimensions + self._image_source_label = image_source_label + + def validate_image_references(self) -> None: + """Flag items whose image_name has no matching image (or hard-error in strict).""" + opts = self._options + unmatched: list[tuple[int, str]] = [] + for item_idx, item in enumerate(self._items, start=1): + if item.skipped: + continue + name = item.fields[opts.image_name_column] + if not name: + item.skipped = True + elif PurePosixPath(name) not in self._image_relative_paths: + unmatched.append((item_idx, name)) + item.skipped = True + if not unmatched: + return + + preview = "\n".join(f" item {idx}: {name!r}" for idx, name in unmatched[:20]) more = ( f"\n ... ({len(unmatched) - 20} more)" if len(unmatched) > 20 else "" ) msg = ( - f"{len(unmatched)} CSV row(s) reference images not found under " - f"{images_folder_dbx}:\n{preview}{more}" + f"{len(unmatched)} item(s) reference images not found in " + f"{self._image_source_label}:\n{preview}{more}" ) - if args.strict: + if opts.is_strict: die( - f"{msg}\nAborting (--strict). Re-run with --lenient to skip these rows." + f"{msg}\nAborting (--strict). Re-run with --lenient to skip these items." ) print(f"{Fore.YELLOW}WARNING: {msg}") - skip = {n for _, n in unmatched} - rows = [ - r for r in rows if (r.get(args.image_name_column) or "").strip() not in skip - ] - # 4. Build categories[] (id by order of first appearance). - # Empty values are routed to an "unknown" category, added on first occurrence. - UNKNOWN_NAME = "unknown" - categories: list[dict] = [] - cat_id_by_name: dict[str, int] = {} - unknown_count = 0 - for row in rows: - raw = (row.get(args.category_column) or "").strip() - name = raw or UNKNOWN_NAME - if not raw: - unknown_count += 1 - if name not in cat_id_by_name: - cat_id_by_name[name] = len(categories) + 1 - categories.append({"id": cat_id_by_name[name], "name": name}) - if unknown_count: - print( - f"{Fore.YELLOW}WARNING: {unknown_count} row(s) had empty " - f"{args.category_column!r}; routed to category {UNKNOWN_NAME!r}." + def build_categories(self) -> tuple[list[CocoCategory], dict[str, int]]: + category_column = self._options.category_column + categories: list[CocoCategory] = [] + category_id_by_name: dict[str, int] = {} + unknown_count = 0 + for item in self._items: + if item.skipped: + continue + raw = item.fields[category_column] + name = raw or UNKNOWN_CATEGORY + if not raw: + unknown_count += 1 + if name not in category_id_by_name: + category_id_by_name[name] = len(categories) + 1 + categories.append({"id": category_id_by_name[name], "name": name}) + if unknown_count: + print( + f"{Fore.YELLOW}WARNING: {unknown_count} item(s) had empty " + f"{category_column!r}; routed to category {UNKNOWN_CATEGORY!r}." + ) + return categories, category_id_by_name + + def validate_bboxes(self) -> None: + """Parse + convert each item's bbox; flag invalid ones (or hard-error in strict).""" + opts = self._options + invalid: list[tuple[int, str]] = [] + for item_idx, item in enumerate(self._items, start=1): + if item.skipped: + continue + raw_box = item.fields[opts.boxes_column] + try: + box = json.loads(raw_box) + except (json.JSONDecodeError, TypeError) as exc: + invalid.append((item_idx, f"failed to parse as JSON: {exc}")) + item.skipped = True + continue + if not isinstance(box, list) or len(box) != 4: + invalid.append((item_idx, f"not a 4-element list: {box!r}")) + item.skipped = True + continue + if opts.box_format == "xyxy": + x1, y1, x2, y2 = box + item.parsed_bbox = [x1, y1, x2 - x1, y2 - y1] + else: + item.parsed_bbox = list(box) + if not invalid: + return + + preview = "\n".join( + f" item {idx}: {opts.boxes_column!r} {err}" + for idx, err in invalid[:20] + ) + more = ( + f"\n ... ({len(invalid) - 20} more)" if len(invalid) > 20 else "" + ) + msg = ( + f"{len(invalid)} item(s) have invalid {opts.boxes_column!r}:\n" + f"{preview}{more}" ) + if opts.is_strict: + die( + f"{msg}\nAborting (--strict). Re-run with --lenient to skip invalid items." + ) + print(f"{Fore.YELLOW}WARNING: {msg}") - # 6. Assign image ids - rows_by_image: dict[str, list[dict]] = {} - for row in rows: - rows_by_image.setdefault(row[args.image_name_column].strip(), []).append(row) + def assign_image_ids(self) -> dict[PurePosixPath, CocoId]: + opts = self._options + image_id_by_path: dict[PurePosixPath, CocoId] = {} - image_id_by_relpath: dict[str, int | str] = {} - if args.image_id_column: - for relpath, row_list in rows_by_image.items(): - ids = {r[args.image_id_column].strip() for r in row_list} - if len(ids) > 1: + if not opts.image_id_column: + # Group items by image_name and assign 1..N to each unique image + # in order of first appearance. We deliberately walk every item + # (including skipped ones) so an image's id is stable regardless + # of which items happen to be skipped on a given run. + next_id = 1 + for item in self._items: + path = PurePosixPath(item.fields[opts.image_name_column]) + if path not in image_id_by_path: + image_id_by_path[path] = next_id + next_id += 1 + return image_id_by_path + + # Many items may share an image; all such items must declare the same + # image_id, and no two distinct images may claim the same id. + path_by_id: dict[CocoId, PurePosixPath] = {} + inconsistent: list[tuple[int, str, CocoId, CocoId]] = [] + reused_id: list[tuple[int, CocoId, str, str]] = [] + for item_idx, item in enumerate(self._items, start=1): + if item.skipped: + continue + path = PurePosixPath(item.fields[opts.image_name_column]) + new_id = coerce_id(item.fields[opts.image_id_column]) + + existing_id = image_id_by_path.get(path) + if existing_id is not None: + if existing_id != new_id: + inconsistent.append((item_idx, str(path), new_id, existing_id)) + item.skipped = True + continue # path already mapped; consistent items just share it + + existing_path = path_by_id.get(new_id) + if existing_path is not None: + reused_id.append((item_idx, new_id, str(path), str(existing_path))) + item.skipped = True + continue + + image_id_by_path[path] = new_id + path_by_id[new_id] = path + + if inconsistent: + preview = "\n".join( + f" item {idx}: {p!r} declares {opts.image_id_column!r}={n!r}, " + f"but {e!r} was established earlier" + for idx, p, n, e in inconsistent[:20] + ) + more = ( + f"\n ... ({len(inconsistent) - 20} more)" + if len(inconsistent) > 20 + else "" + ) + msg = ( + f"{len(inconsistent)} item(s) declare an {opts.image_id_column!r} " + f"inconsistent with an earlier item for the same image:\n{preview}{more}" + ) + if opts.is_strict: die( - f"Image {relpath!r} has inconsistent values in " - f"{args.image_id_column!r}: {sorted(ids)}" + f"{msg}\nAborting (--strict). Re-run with --lenient to skip these items." ) - image_id_by_relpath[relpath] = coerce_id(next(iter(ids))) - used_int = [v for v in image_id_by_relpath.values() if isinstance(v, int)] - next_id = (max(used_int) + 1) if used_int else 1 - for relpath in image_by_relpath: - if relpath not in image_id_by_relpath: - image_id_by_relpath[relpath] = next_id - next_id += 1 - else: - for idx, relpath in enumerate(image_by_relpath.keys(), start=1): - image_id_by_relpath[relpath] = idx - - # 7. Drop empty images if not included - if not args.include_empty_images: - kept = {r[args.image_name_column].strip() for r in rows} - image_by_relpath = OrderedDict( - (rp, e) for rp, e in image_by_relpath.items() if rp in kept - ) + print(f"{Fore.YELLOW}WARNING: {msg}") - # 8. Share links + optional dimension probe - images_out: list[dict] = [] - total = len(image_by_relpath) - for i, (relpath, entry) in enumerate(image_by_relpath.items(), start=1): - print(f"{i}/{total} sharing {relpath}") - url = share_file_get_url(entry["path_lower"], access_token=access_token) - record: dict = { - "id": image_id_by_relpath[relpath], - "file_name": relpath, - "coco_url": url, - } - if args.probe_dimensions: - from PIL import Image # lazy import + if reused_id: + preview = "\n".join( + f" item {idx}: {opts.image_id_column!r} {i!r} " + f"(already used by {ep!r}, claimed by {p!r})" + for idx, i, p, ep in reused_id[:20] + ) + more = ( + f"\n ... ({len(reused_id) - 20} more)" + if len(reused_id) > 20 + else "" + ) + msg = ( + f"{len(reused_id)} item(s) reuse an {opts.image_id_column!r} " + f"already assigned to a different image:\n{preview}{more}" + ) + if opts.is_strict: + die( + f"{msg}\nAborting (--strict). Re-run with --lenient to skip these items." + ) + print(f"{Fore.YELLOW}WARNING: {msg}") - img_bytes = dropbox_download(entry["path_lower"], access_token=access_token) - with Image.open(io.BytesIO(img_bytes)) as im: - record["width"], record["height"] = im.size - images_out.append(record) - - # 9. Build annotations[] - attribute_columns = [ - c - for c in headers - if c - not in { - args.image_name_column, - args.boxes_column, - args.category_column, - args.annotation_id_column, - args.image_id_column, - } - ] - annotations_out: list[dict] = [] - seen_ann_ids: set = set() + return image_id_by_path - for row_idx, row in enumerate(rows, start=1): - relpath = row[args.image_name_column].strip() - image_id = image_id_by_relpath[relpath] + def validate_annotation_ids(self) -> None: + """Stash annotation_id on each non-skipped item. - raw_box = row[args.boxes_column] - try: - box = json.loads(raw_box) - except (json.JSONDecodeError, TypeError) as exc: - die( - f"Row {row_idx}: failed to parse {args.boxes_column!r} as JSON: {exc}" - ) - if not isinstance(box, list) or len(box) != 4: + If `annotation_id_column` is set, parse + dedup; otherwise auto-assign + each non-skipped item its 1-based item position so ids are stable + across runs (skipped positions leave gaps in the output). + """ + opts = self._options + if not opts.annotation_id_column: + for item_idx, item in enumerate(self._items, start=1): + if item.skipped: + continue + item.annotation_id = item_idx + return + + seen: dict[CocoId, int] = {} + duplicates: list[tuple[int, CocoId, int]] = [] + for item_idx, item in enumerate(self._items, start=1): + if item.skipped: + continue + annotation_id = coerce_id(item.fields[opts.annotation_id_column]) + existing_idx = seen.get(annotation_id) + if existing_idx is not None: + duplicates.append((item_idx, annotation_id, existing_idx)) + item.skipped = True + continue + seen[annotation_id] = item_idx + item.annotation_id = annotation_id + + if not duplicates: + return + + preview = "\n".join( + f" item {idx}: {opts.annotation_id_column!r} {dup_id!r} " + f"(already used by item {first})" + for idx, dup_id, first in duplicates[:20] + ) + more = ( + f"\n ... ({len(duplicates) - 20} more)" if len(duplicates) > 20 else "" + ) + msg = ( + f"{len(duplicates)} item(s) have a duplicate " + f"{opts.annotation_id_column!r}:\n{preview}{more}" + ) + if opts.is_strict: die( - f"Row {row_idx}: {args.boxes_column!r} is not a 4-element list: " - f"{box!r}" + f"{msg}\nAborting (--strict). Re-run with --lenient to skip duplicates." ) - if args.box_format == "xyxy": - x1, y1, x2, y2 = box - bbox = [x1, y1, x2 - x1, y2 - y1] - else: - bbox = list(box) + print(f"{Fore.YELLOW}WARNING: {msg}") - if args.annotation_id_column: - ann_id = coerce_id(row[args.annotation_id_column]) - if ann_id in seen_ann_ids: - die(f"Row {row_idx}: duplicate annotation id {ann_id!r}") - seen_ann_ids.add(ann_id) - else: - ann_id = len(annotations_out) + 1 - - cat_name = (row.get(args.category_column) or "").strip() or UNKNOWN_NAME - annotations_out.append( - { - "id": ann_id, - "image_id": image_id, - "bbox": bbox, - "category_id": cat_id_by_name[cat_name], - "attributes": {col: row[col] for col in attribute_columns}, + def get_used_image_relative_paths(self) -> dict[PurePosixPath, None]: + """Return `_image_relative_paths` filtered to images referenced by surviving items.""" + kept = { + PurePosixPath(item.fields[self._options.image_name_column]) + for item in self._items + if not item.skipped + } + return { + relative_path: None + for relative_path in self._image_relative_paths + if relative_path in kept + } + + def build_image_records( + self, + image_id_by_path: dict[PurePosixPath, CocoId], + image_relative_paths: dict[PurePosixPath, None], + ) -> list[CocoImage]: + images: list[CocoImage] = [] + total = len(image_relative_paths) + for idx, relative_path in enumerate(image_relative_paths, start=1): + print(f"{idx}/{total} resolving {relative_path}") + record: CocoImage = { + "id": image_id_by_path[relative_path], + "file_name": str(relative_path), + "coco_url": self._resolve_image_url(relative_path), } - ) + if self._probe_image_dimensions is not None: + record["width"], record["height"] = self._probe_image_dimensions( + relative_path + ) + images.append(record) + return images - coco = { - "images": images_out, - "annotations": annotations_out, - "categories": categories, - } + def build_annotations( + self, + image_id_by_path: dict[PurePosixPath, CocoId], + category_id_by_name: dict[str, int], + ) -> list[CocoAnnotation]: + opts = self._options + attribute_fields = [ + f + for f in self._fields + if f + not in { + opts.image_name_column, + opts.boxes_column, + opts.category_column, + opts.annotation_id_column, + opts.image_id_column, + } + ] + annotations: list[CocoAnnotation] = [] - # 10. Write locally - with open(output_path, "w") as f: - json.dump(coco, f, indent=2) - print(f"{Fore.GREEN}Wrote {output_path}") + for item in self._items: + if item.skipped: + continue + fields = item.fields + relative_path = PurePosixPath(fields[opts.image_name_column]) + image_id = image_id_by_path[relative_path] + annotation_id = cast(CocoId, item.annotation_id) - # 11. Optional Dropbox upload - dropbox_link: str | None = None - if args.upload_to_dropbox: - out_basename = PurePosixPath(output_path).name - dest = str(PurePosixPath(base_folder) / out_basename) - print(f"Uploading COCO file to {dest} ...") - dropbox_upload( - dest, json.dumps(coco).encode("utf-8"), access_token=access_token - ) - link_res = dropbox_rpc( - "files/get_temporary_link", {"path": dest}, access_token=access_token + category_name = fields[opts.category_column] or UNKNOWN_CATEGORY + annotations.append( + { + "id": annotation_id, + "image_id": image_id, + "bbox": item.parsed_bbox, + "category_id": category_id_by_name[category_name], + "attributes": {f: fields[f] for f in attribute_fields}, + } + ) + return annotations + + def build(self) -> CocoDocument: + self.validate_image_references() + self.validate_bboxes() + + image_id_by_path = self.assign_image_ids() + self.validate_annotation_ids() + used_image_relative_paths = self.get_used_image_relative_paths() + + images = self.build_image_records( + image_id_by_path, + used_image_relative_paths, ) - dropbox_link = link_res["link"] - print(f"{Fore.GREEN}Uploaded to {dest}") - print(f"{Fore.GREEN}Temporary link: {dropbox_link}") - - # 12. Summary - images_with_detections = len( - {r[args.image_name_column].strip() for r in rows} - & set(image_by_relpath.keys()) - ) + + categories, category_id_by_name = self.build_categories() + + annotations = self.build_annotations(image_id_by_path, category_id_by_name) + return { + "images": images, + "annotations": annotations, + "categories": categories, + } + + def count_images_with_detections(self) -> int: + """Number of images that have at least one item referencing them.""" + items_image_paths = { + PurePosixPath(item.fields[self._options.image_name_column]) + for item in self._items + if not item.skipped + } + return len(items_image_paths & self._image_relative_paths.keys()) + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def print_summary( + coco: CocoDocument, + *, + images_with_detections: int, + output_path: Path, + dropbox_link: str | None, +) -> None: print() print(f"{Fore.CYAN}Summary:") - print(f" Categories: {len(categories)}") + print(f" Categories: {len(coco['categories'])}") print( - f" Images: {len(images_out)} ({images_with_detections} with detections)" + f" Images: {len(coco['images'])} " + f"({images_with_detections} with detections)" ) - print(f" Annotations: {len(annotations_out)}") + print(f" Annotations: {len(coco['annotations'])}") print(f" Output: {output_path}") if dropbox_link: print(f" Dropbox: {dropbox_link}") +def main() -> None: + args = parse_args() + + access_token = os.environ.get("DROPBOX_ACCESS_TOKEN") + if not access_token: + die("DROPBOX_ACCESS_TOKEN env var is not set.") + + with DropboxClient(access_token) as client: + # Cheap checks first: download CSV and validate its fields before + # paying for a recursive image listing. + rows, headers = load_dropbox_csv_rows( + client, args.metadata_csv_path_dropbox + ) + ensure_required_fields(headers, args) + + image_index = list_dropbox_image_index( + client, args.images_path_dropbox, args.base_folder + ) + + def resolve_image_url(relative_path: PurePosixPath) -> str: + return client.get_or_create_shareable_url( + PurePosixPath(image_index[relative_path]["path_lower"]) + ) + + def probe_image_dimensions(relative_path: PurePosixPath) -> tuple[int, int]: + img_bytes = client.download( + PurePosixPath(image_index[relative_path]["path_lower"]) + ) + with Image.open(io.BytesIO(img_bytes)) as im: + return im.size + + print(image_index.keys()) + + builder = CocoBuilder( + options=BuilderOptions.from_args(args), + items=rows, + fields=headers, + image_relative_paths=image_index.keys(), + resolve_image_url=resolve_image_url, + probe_image_dimensions=( + probe_image_dimensions if args.should_probe_dimensions else None + ), + image_source_label=str(args.base_folder), + ) + coco = builder.build() + + write_output(coco, args.output_path) + dropbox_link: str | None = None + if args.should_upload_to_dropbox: + dropbox_link = upload_coco_to_dropbox( + client, coco, args.base_folder, args.output_path + ) + + print_summary( + coco, + images_with_detections=builder.count_images_with_detections(), + output_path=args.output_path, + dropbox_link=dropbox_link, + ) + + if __name__ == "__main__": main()