From 6e29cf8a01c2bcebcb55caf2897ca26d163b8e2f Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 13:51:25 -0400 Subject: [PATCH 1/6] fix: initial implmentation --- api/search.py | 505 +------------------- services/search.py | 567 +++++++++++++++++++++++ tests/test_models_wired_at_read_sites.py | 4 +- tests/test_search_helpers.py | 472 +++++++++++++++++++ 4 files changed, 1060 insertions(+), 488 deletions(-) create mode 100644 services/search.py create mode 100644 tests/test_search_helpers.py diff --git a/api/search.py b/api/search.py index ae35b09..3ad9c89 100644 --- a/api/search.py +++ b/api/search.py @@ -3,70 +3,23 @@ GET /api/search?q=...&type=all|chat|composer """ -import json import logging -import os -import re -import sqlite3 -from contextlib import closing -from datetime import datetime -from urllib.parse import unquote as _url_unquote from flask import Blueprint, current_app, jsonify, request -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules -from utils.workspace_path import resolve_workspace_path, get_cli_chats_path -from utils.path_helpers import to_epoch_ms, warn_workspace_json_read -from utils.text_extract import extract_text_from_bubble -from utils.cli_chat_reader import list_cli_projects, traverse_blobs, messages_to_bubbles -from models import Bubble, Composer, ParseWarningCollector, SchemaError +from models import ParseWarningCollector +from services.search import ( + rank_results, + search_cli_sessions, + search_global_storage, + search_legacy_workspaces, +) +from utils.workspace_path import get_cli_chats_path, resolve_workspace_path bp = Blueprint("search", __name__) _logger = logging.getLogger(__name__) -def _json_dump_safe(value) -> str: - """Best-effort JSON string conversion for exclusion matching.""" - try: - return json.dumps(value, ensure_ascii=False, sort_keys=True) - except Exception: - return str(value) if value is not None else "" - - -def _workspace_display_name_from_folder(folder: str | None, fallback: str | None = None) -> str: - """Extract a human-readable workspace name from workspace folder path.""" - if folder: - raw = str(folder).strip() - cleaned = re.sub(r"^file://", "", raw).replace("\\", "/") - parts = cleaned.split("/") - leaf = parts[-1] if parts else "" - if leaf: - return _url_unquote(leaf) - return fallback or "Other chats" - - -def _build_exclusion_searchable( - *, - project_name: str | None, - chat_title: str | None, - model_names: list[str] | None = None, - content_parts: list[str] | None = None, - metadata_parts: list[str] | None = None, -) -> str: - """Build broad searchable text so exclusion rules cover visible output.""" - combined: list[str] = [] - if content_parts: - combined.extend(p for p in content_parts if p) - if metadata_parts: - combined.extend(p for p in metadata_parts if p) - return build_searchable_text( - project_name=project_name, - chat_title=chat_title, - model_names=model_names, - chat_content_snippet="\n\n".join(combined) if combined else None, - ) - - @bp.route("/api/search") def search(): try: @@ -78,442 +31,22 @@ def search(): return jsonify({"error": "No search query provided"}), 400 workspace_path = resolve_workspace_path() - results = [] parse_warnings = ParseWarningCollector() query_lower = query.lower() - global_db_path = os.path.normpath(os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")) - - # --------------------------------------------------------------- - # Search global cursorDiskKV (new Cursor format — primary source) - # --------------------------------------------------------------- - if os.path.isfile(global_db_path): - # try/finally guarantees .close() on every exit path including - # exception (issue #17). Equivalent to wrapping the body in - # `with closing(sqlite3.connect(...))`, without the 160-line - # indent shift over the search logic that follows. - conn = None - try: - conn = sqlite3.connect(f"file:{global_db_path}?mode=ro", uri=True) - conn.row_factory = sqlite3.Row - - # Build workspace name map for display - workspace_entries = [] - ws_id_to_name = {} - try: - for name in os.listdir(workspace_path): - full = os.path.join(workspace_path, name) - wj = os.path.join(full, "workspace.json") - if os.path.isdir(full) and os.path.isfile(wj): - workspace_entries.append({"name": name, "workspaceJsonPath": wj}) - try: - with open(wj, "r", encoding="utf-8") as f: - wd = json.load(f) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - parts = first_folder.replace("\\", "/").split("/") - fn = parts[-1] if parts else None - if fn: - ws_id_to_name[name] = _url_unquote(fn) - except Exception as e: - warn_workspace_json_read(_logger, name, e) - except Exception as e: - _logger.warning( - "Failed to list workspace entries under %s: %s", - workspace_path, - e, - ) - - # Build composer → workspace mapping - composer_id_to_ws = {} - for entry in workspace_entries: - db_path = os.path.join(workspace_path, entry["name"], "state.vscdb") - if not os.path.isfile(db_path): - continue - try: - # closing() guarantees .close() on scope exit (issue #17). - with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as wconn: - row = wconn.execute( - "SELECT value FROM ItemTable WHERE [key] = 'composer.composerData'" - ).fetchone() - if row and row[0]: - data = json.loads(row[0]) - all_composers = data.get("allComposers") - if isinstance(all_composers, list): - for c in all_composers: - cid = c.get("composerId") if isinstance(c, dict) else None - if cid: - composer_id_to_ws[cid] = entry["name"] - except Exception as e: - _logger.warning( - "Failed to load composer mapping from workspace %s: %s", - entry["name"], - e, - ) - - # Load bubble text for searching - bubble_map = {} - for row in conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"): - parts = row["key"].split(":") - if len(parts) >= 3: - bid = parts[2] - try: - bubble = Bubble.from_dict(json.loads(row["value"]), bubble_id=bid) - text = extract_text_from_bubble(bubble) - bubble_map[bid] = {"text": text, "raw": bubble.raw} - except SchemaError as e: - # Drift logged so the operator can see why a chat dropped - # out of search results; bad row still skipped so search - # keeps returning results from the well-formed ones. - _logger.warning( - "Schema drift in bubble %s: %s (%s)", - bid, - e, - type(e).__name__, - ) - parse_warnings.record_bubble_skipped() - except (json.JSONDecodeError, TypeError, ValueError) as e: - _logger.warning( - "Failed to decode Bubble from bubbleId:%s: %s", - bid, - e, - ) - parse_warnings.record_bubble_skipped() - - # Search through composerData - composer_rows = conn.execute( - "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%' AND LENGTH(value) > 10" - ).fetchall() - - for row in composer_rows: - composer_id = row["key"].split(":")[1] - try: - composer = Composer.from_dict(json.loads(row["value"]), composer_id=composer_id) - except SchemaError as e: - _logger.warning( - "Schema drift in composer %s: %s (%s)", - composer_id, - e, - type(e).__name__, - ) - parse_warnings.record_composer_skipped() - continue - except (json.JSONDecodeError, TypeError, ValueError) as e: - _logger.warning( - "Failed to decode Composer from composerData:%s: %s", - composer_id, - e, - ) - parse_warnings.record_composer_skipped() - continue - try: - cd = composer.raw - headers = composer.full_conversation_headers_only - if not headers: - continue - - title = composer.name or "" - ws_id = composer_id_to_ws.get(composer_id, "global") - ws_name = ws_id_to_name.get(ws_id) - project_name = ws_name or ("Other chats" if ws_id == "global" else ws_id) - - model_config = composer.model_config - model_name = model_config.get("modelName") - model_names = [model_name] if model_name and model_name != "default" else None - - bubble_texts = [] - bubble_meta = [] - for header in headers: - bid = header.get("bubbleId") - bubble_entry = bubble_map.get(bid) - if not bubble_entry: - continue - text = bubble_entry.get("text") or "" - if text: - bubble_texts.append(text) - raw_bubble = bubble_entry.get("raw") - if raw_bubble: - bubble_meta.append(_json_dump_safe(raw_bubble)) - - exclusion_text = _build_exclusion_searchable( - project_name=project_name, - chat_title=title, - model_names=model_names, - content_parts=bubble_texts, - metadata_parts=[ - _json_dump_safe(model_config), - _json_dump_safe(cd.get("conversationSummary")), - _json_dump_safe(cd.get("usage")), - _json_dump_safe(cd.get("requestMetadata")), - _json_dump_safe(cd), - "\n".join(bubble_meta), - ], - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - # Check if any bubble text matches - has_match = False - matching_text = "" - # Check title - if title and query_lower in title.lower(): - has_match = True - matching_text = title - - # Check bubble texts - if not has_match: - for text in bubble_texts: - if text and query_lower in text.lower(): - has_match = True - # Extract a snippet around the match - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") - break - - if has_match: - if not title: - # Derive title from first bubble - for text in bubble_texts: - if text: - first_lines = [ln for ln in text.split("\n") if ln.strip()] - if first_lines: - title = first_lines[0][:100] - break - if not title: - title = f"Conversation {composer_id[:8]}" - - results.append({ - "workspaceId": ws_id, - "workspaceFolder": ws_name, - "chatId": composer_id, - "chatTitle": title, - "timestamp": to_epoch_ms(composer.last_updated_at) or to_epoch_ms(composer.created_at) or int(datetime.now().timestamp() * 1000), - "matchingText": matching_text, - "type": "composer", - }) - except Exception as e: - _logger.warning( - "Failed to process Composer from composerData:%s during search: %s", - composer_id, - e, - ) - parse_warnings.record_composer_processing_failure() - - except Exception: - _logger.exception("Error searching global storage") - finally: - if conn is not None: - conn.close() - - # --------------------------------------------------------------- - # Search per-workspace ItemTable (legacy format — fallback) - # --------------------------------------------------------------- - try: - for name in os.listdir(workspace_path): - full = os.path.join(workspace_path, name) - if not os.path.isdir(full): - continue - db_path = os.path.join(full, "state.vscdb") - wj_path = os.path.join(full, "workspace.json") - if not os.path.isfile(db_path): - continue - - workspace_folder = None - try: - with open(wj_path, "r", encoding="utf-8") as f: - wd = json.load(f) - workspace_folder = wd.get("folder") - except Exception as e: - warn_workspace_json_read(_logger, name, e) - workspace_name = _workspace_display_name_from_folder(workspace_folder, fallback=name) - - # try/finally guarantees .close() on every exit path (issue #17). - conn = None - try: - conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) - - # Search chat logs - if search_type in ("all", "chat"): - chat_row = conn.execute( - "SELECT value FROM ItemTable WHERE [key] = 'workbench.panel.aichat.view.aichat.chatdata'" - ).fetchone() - if chat_row and chat_row[0]: - data = json.loads(chat_row[0]) - for tab in (data.get("tabs") or []): - ct = tab.get("chatTitle") or "" - tab_model_names = None - tab_meta = tab.get("metadata") - if isinstance(tab_meta, dict): - models_used = tab_meta.get("modelsUsed") - if isinstance(models_used, list): - tab_model_names = [str(m) for m in models_used if m] - elif tab_meta.get("model"): - tab_model_names = [str(tab_meta.get("model"))] - - tab_bubble_texts = [] - for bubble in (tab.get("bubbles") or []): - text = bubble.get("text") or "" - if text: - tab_bubble_texts.append(text) - - exclusion_text = _build_exclusion_searchable( - project_name=workspace_name, - chat_title=ct, - model_names=tab_model_names, - content_parts=tab_bubble_texts, - metadata_parts=[ - _json_dump_safe(tab), - _json_dump_safe(workspace_folder), - ], - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - has_match = False - matching_text = "" - - if ct.lower().find(query_lower) != -1: - has_match = True - matching_text = ct - - for bubble in (tab.get("bubbles") or []): - text = bubble.get("text") or "" - if text.lower().find(query_lower) != -1: - has_match = True - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") - break - - if has_match: - results.append({ - "workspaceId": name, - "workspaceFolder": workspace_folder, - "chatId": tab.get("tabId"), - "chatTitle": ct or f"Chat {(tab.get('tabId') or '')[:8]}", - "timestamp": tab.get("lastSendTime") or datetime.now().isoformat(), - "matchingText": matching_text, - "type": "chat", - }) - - except Exception as e: - _logger.warning( - "Failed to search legacy workspace %s: %s", - name, - e, - ) - finally: - if conn is not None: - conn.close() - except Exception as e: - _logger.warning( - "Failed to iterate legacy workspaces under %s: %s", - workspace_path, - e, - ) - - # --------------------------------------------------------------- - # Search Cursor CLI sessions (only for type=all) - # --------------------------------------------------------------- + results = [] + results.extend( + search_global_storage(workspace_path, query, query_lower, rules, parse_warnings) + ) + results.extend( + search_legacy_workspaces(workspace_path, query, query_lower, search_type, rules) + ) if search_type == "all": - try: - cli_projects = list_cli_projects(get_cli_chats_path()) - for cp in cli_projects: - ws_name = cp["workspace_name"] or cp["project_id"][:12] - for session in cp["sessions"]: - meta = session.get("meta", {}) - session_id = session["session_id"] - created_ms: int = meta.get("createdAt") or int(datetime.now().timestamp() * 1000) - session_name = meta.get("name") or f"Session {session_id[:8]}" - - try: - messages = traverse_blobs(session["db_path"]) - except Exception as e: - _logger.warning( - "Failed to traverse CLI session blobs for %s: %s", - session_id, - e, - ) - continue - - bubbles = messages_to_bubbles(messages, created_ms) - if not bubbles: - continue - - # Derive title - title = session_name - if not title or title.startswith("New Agent"): - for b in bubbles: - if b["type"] == "user" and b.get("text"): - first_lines = [ln for ln in b["text"].split("\n") if ln.strip()] - if first_lines: - title = first_lines[0][:100] - break - - bubble_texts = [b["text"] for b in bubbles if b.get("text")] - tool_payloads = [ - tc.get("input") or tc.get("summary") or "" - for b in bubbles - for tc in (b.get("metadata") or {}).get("toolCalls") or [] - ] - exclusion_text = _build_exclusion_searchable( - project_name=ws_name, - chat_title=title, - content_parts=bubble_texts + tool_payloads, - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - has_match = False - matching_text = "" - - if title and query_lower in title.lower(): - has_match = True - matching_text = title - - if not has_match: - for text in bubble_texts: - if text and query_lower in text.lower(): - has_match = True - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ( - ("..." if start > 0 else "") - + text[start:end] - + ("..." if end < len(text) else "") - ) - break - - if has_match: - results.append({ - "workspaceId": f"cli:{cp['project_id']}", - "workspaceFolder": cp.get("workspace_path"), - "chatId": session_id, - "chatTitle": title, - "timestamp": created_ms, - "matchingText": matching_text, - "type": "cli_agent", - "source": "cli", - }) - except Exception: - _logger.exception("Error searching CLI sessions") - - # Sort by timestamp descending - def _ts(r): - t = r.get("timestamp", 0) - if isinstance(t, str): - try: - return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() - except Exception: - return 0 - return t - results.sort(key=_ts, reverse=True) + results.extend( + search_cli_sessions(get_cli_chats_path(), query, query_lower, rules) + ) - payload: dict = {"results": results} + payload: dict = {"results": rank_results(results)} return jsonify(parse_warnings.attach_to(payload)) except Exception: diff --git a/services/search.py b/services/search.py new file mode 100644 index 0000000..a54d0d5 --- /dev/null +++ b/services/search.py @@ -0,0 +1,567 @@ +"""Search helpers: three independent data-source readers for /api/search. + +Each public function targets exactly one data source, accepts explicit inputs +with no Flask request-context dependency, and returns a plain list of result +dicts. The route handler in ``api/search.py`` calls all three and merges. + +Data sources +------------ +* :func:`search_global_storage` — composerData rows in global ``cursorDiskKV`` +* :func:`search_legacy_workspaces` — per-workspace ItemTable (legacy chat format) +* :func:`search_cli_sessions` — JSONL files from Cursor CLI agent sessions + +Aggregation +----------- +* :func:`rank_results` — sort merged results by timestamp descending +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +from contextlib import closing +from datetime import datetime +from pathlib import Path + +from models import Bubble, Composer, ParseWarningCollector, SchemaError +from services.workspace_db import ( + build_composer_id_to_workspace_id, + collect_workspace_entries, + open_global_db, +) +from utils.cli_chat_reader import list_cli_projects, messages_to_bubbles, traverse_blobs +from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.path_helpers import ( + get_workspace_display_name, + to_epoch_ms, + warn_workspace_json_read, +) +from utils.text_extract import extract_text_from_bubble + +_logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Private helpers — pure functions / small utilities +# --------------------------------------------------------------------------- + + +def _json_dump_safe(value) -> str: + """Best-effort JSON serialisation for exclusion-rule matching.""" + try: + return json.dumps(value, ensure_ascii=False, sort_keys=True) + except Exception: + return str(value) if value is not None else "" + + +def _build_exclusion_searchable( + *, + project_name: str | None, + chat_title: str | None, + model_names: list[str] | None = None, + content_parts: list[str] | None = None, + metadata_parts: list[str] | None = None, +) -> str: + """Compose broad searchable text so exclusion rules cover all visible fields.""" + combined: list[str] = [] + if content_parts: + combined.extend(p for p in content_parts if p) + if metadata_parts: + combined.extend(p for p in metadata_parts if p) + return build_searchable_text( + project_name=project_name, + chat_title=chat_title, + model_names=model_names, + chat_content_snippet="\n\n".join(combined) if combined else None, + ) + + +def _extract_snippet(text: str, query: str, query_lower: str) -> str: + """Return a context window around the first match of *query* in *text*. + + Returns an empty string if there is no match. + """ + idx = text.lower().find(query_lower) + if idx == -1: + return "" + start = max(0, idx - 80) + end = min(len(text), idx + len(query) + 120) + return ( + ("..." if start > 0 else "") + + text[start:end] + + ("..." if end < len(text) else "") + ) + + +def _find_match( + title: str, + bubble_texts: list[str], + query_lower: str, + query: str, +) -> tuple[bool, str]: + """Check whether a conversation matches the search query. + + Returns ``(has_match, matching_text)`` where *matching_text* is either the + full title (on a title hit) or a snippet around the first bubble match. + """ + if title and query_lower in title.lower(): + return True, title + for text in bubble_texts: + if text and query_lower in text.lower(): + return True, _extract_snippet(text, query, query_lower) + return False, "" + + +# --------------------------------------------------------------------------- +# Private data builders +# --------------------------------------------------------------------------- + + +def _build_ws_id_to_name( + workspace_path: str, + workspace_entries: list[dict], +) -> dict[str, str]: + """Map workspace folder IDs to human-readable display names. + + Reads each workspace's ``workspace.json`` via + :func:`~utils.path_helpers.get_workspace_display_name`. Entries whose + JSON cannot be read are silently skipped (warning logged). + """ + mapping: dict[str, str] = {} + for entry in workspace_entries: + try: + with open(entry["workspaceJsonPath"], "r", encoding="utf-8") as fh: + wd = json.load(fh) + name = get_workspace_display_name(wd) + if name: + mapping[entry["name"]] = name + except Exception as exc: + warn_workspace_json_read(_logger, entry["name"], exc) + return mapping + + +def _build_search_bubble_map( + global_db, + parse_warnings: ParseWarningCollector, +) -> dict[str, dict]: + """Load ``bubbleId:*`` rows from an open global DB connection. + + Returns ``{bubble_id: {"text": str, "raw": dict}}``. Rows that fail + schema validation or JSON decoding are skipped; the skip is recorded in + *parse_warnings*. + """ + bubble_map: dict[str, dict] = {} + for row in global_db.execute( + "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'" + ): + parts = row["key"].split(":") + if len(parts) < 3: + continue + bid = parts[2] + try: + bubble = Bubble.from_dict(json.loads(row["value"]), bubble_id=bid) + bubble_map[bid] = {"text": extract_text_from_bubble(bubble), "raw": bubble.raw} + except SchemaError as exc: + _logger.warning( + "Schema drift in bubble %s: %s (%s)", bid, exc, type(exc).__name__ + ) + parse_warnings.record_bubble_skipped() + except (json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning("Failed to decode Bubble from bubbleId:%s: %s", bid, exc) + parse_warnings.record_bubble_skipped() + return bubble_map + + +# --------------------------------------------------------------------------- +# Public: per-source search functions +# --------------------------------------------------------------------------- + + +def search_global_storage( + workspace_path: str, + query: str, + query_lower: str, + rules: list, + parse_warnings: ParseWarningCollector, +) -> list[dict]: + """Search composer conversations stored in the global ``cursorDiskKV`` table. + + This is the primary data source for current Cursor versions. + + Args: + workspace_path: Cursor workspaceStorage root directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + rules: Parsed exclusion rules from app config. + parse_warnings: Collector that accumulates parse/schema failures. + + Returns: + List of search result dicts with keys ``workspaceId``, ``workspaceFolder``, + ``chatId``, ``chatTitle``, ``timestamp``, ``matchingText``, ``type``. + """ + results: list[dict] = [] + try: + workspace_entries = collect_workspace_entries(workspace_path) + ws_id_to_name = _build_ws_id_to_name(workspace_path, workspace_entries) + composer_id_to_ws = build_composer_id_to_workspace_id( + workspace_path, workspace_entries + ) + + with open_global_db(workspace_path) as (conn, _db_path): + if conn is None: + return results + bubble_map = _build_search_bubble_map(conn, parse_warnings) + composer_rows = conn.execute( + "SELECT key, value FROM cursorDiskKV" + " WHERE key LIKE 'composerData:%' AND LENGTH(value) > 10" + ).fetchall() + + for row in composer_rows: + composer_id = row["key"].split(":")[1] + try: + composer = Composer.from_dict( + json.loads(row["value"]), composer_id=composer_id + ) + except SchemaError as exc: + _logger.warning( + "Schema drift in composer %s: %s (%s)", + composer_id, + exc, + type(exc).__name__, + ) + parse_warnings.record_composer_skipped() + continue + except (json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning( + "Failed to decode Composer from composerData:%s: %s", + composer_id, + exc, + ) + parse_warnings.record_composer_skipped() + continue + + try: + headers = composer.full_conversation_headers_only + if not headers: + continue + + title = composer.name or "" + ws_id = composer_id_to_ws.get(composer_id, "global") + ws_name = ws_id_to_name.get(ws_id) + project_name = ws_name or ("Other chats" if ws_id == "global" else ws_id) + + cd = composer.raw + model_config = composer.model_config + model_name = model_config.get("modelName") + model_names = ( + [model_name] if model_name and model_name != "default" else None + ) + + bubble_texts: list[str] = [] + bubble_meta: list[str] = [] + for header in headers: + bid = header.get("bubbleId") + entry = bubble_map.get(bid) + if not entry: + continue + text = entry.get("text") or "" + if text: + bubble_texts.append(text) + raw_bubble = entry.get("raw") + if raw_bubble: + bubble_meta.append(_json_dump_safe(raw_bubble)) + + exclusion_text = _build_exclusion_searchable( + project_name=project_name, + chat_title=title, + model_names=model_names, + content_parts=bubble_texts, + metadata_parts=[ + _json_dump_safe(model_config), + _json_dump_safe(cd.get("conversationSummary")), + _json_dump_safe(cd.get("usage")), + _json_dump_safe(cd.get("requestMetadata")), + _json_dump_safe(cd), + "\n".join(bubble_meta), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + title, bubble_texts, query_lower, query + ) + if not has_match: + continue + + if not title: + for text in bubble_texts: + if text: + first_lines = [ln for ln in text.split("\n") if ln.strip()] + if first_lines: + title = first_lines[0][:100] + break + if not title: + title = f"Conversation {composer_id[:8]}" + + results.append({ + "workspaceId": ws_id, + "workspaceFolder": ws_name, + "chatId": composer_id, + "chatTitle": title, + "timestamp": ( + to_epoch_ms(composer.last_updated_at) + or to_epoch_ms(composer.created_at) + or int(datetime.now().timestamp() * 1000) + ), + "matchingText": matching_text, + "type": "composer", + }) + except Exception as exc: + _logger.warning( + "Failed to process Composer from composerData:%s during search: %s", + composer_id, + exc, + ) + parse_warnings.record_composer_processing_failure() + + except Exception: + _logger.exception("Error searching global storage") + + return results + + +def search_legacy_workspaces( + workspace_path: str, + query: str, + query_lower: str, + search_type: str, + rules: list, +) -> list[dict]: + """Search legacy per-workspace ItemTable chat data. + + Iterates per-workspace ``state.vscdb`` files looking for the + ``workbench.panel.aichat.view.aichat.chatdata`` key (present in older + Cursor versions before global storage migration). + + Args: + workspace_path: Cursor workspaceStorage root directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + search_type: ``"all"`` or ``"chat"`` — other values return immediately. + rules: Parsed exclusion rules from app config. + + Returns: + List of search result dicts with ``type`` set to ``"chat"``. + """ + results: list[dict] = [] + if search_type not in ("all", "chat"): + return results + + try: + for name in os.listdir(workspace_path): + full = os.path.join(workspace_path, name) + if not os.path.isdir(full): + continue + db_path = os.path.join(full, "state.vscdb") + wj_path = os.path.join(full, "workspace.json") + if not os.path.isfile(db_path): + continue + + workspace_folder: str | None = None + workspace_name = name + try: + with open(wj_path, "r", encoding="utf-8") as fh: + wd = json.load(fh) + workspace_folder = wd.get("folder") + workspace_name = get_workspace_display_name(wd, fallback=name) + except Exception as exc: + warn_workspace_json_read(_logger, name, exc) + + db_uri = Path(db_path).resolve().as_uri() + "?mode=ro" + try: + with closing(sqlite3.connect(db_uri, uri=True)) as conn: + chat_row = conn.execute( + "SELECT value FROM ItemTable" + " WHERE [key] = 'workbench.panel.aichat.view.aichat.chatdata'" + ).fetchone() + + if not (chat_row and chat_row[0]): + continue + + data = json.loads(chat_row[0]) + for tab in (data.get("tabs") or []): + ct = tab.get("chatTitle") or "" + + tab_model_names: list[str] | None = None + tab_meta = tab.get("metadata") + if isinstance(tab_meta, dict): + models_used = tab_meta.get("modelsUsed") + if isinstance(models_used, list): + tab_model_names = [str(m) for m in models_used if m] + elif tab_meta.get("model"): + tab_model_names = [str(tab_meta.get("model"))] + + tab_bubble_texts = [ + bubble.get("text") or "" + for bubble in (tab.get("bubbles") or []) + if bubble.get("text") + ] + exclusion_text = _build_exclusion_searchable( + project_name=workspace_name, + chat_title=ct, + model_names=tab_model_names, + content_parts=tab_bubble_texts, + metadata_parts=[ + _json_dump_safe(tab), + _json_dump_safe(workspace_folder), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + ct, tab_bubble_texts, query_lower, query + ) + if not has_match: + continue + + results.append({ + "workspaceId": name, + "workspaceFolder": workspace_folder, + "chatId": tab.get("tabId"), + "chatTitle": ct or f"Chat {(tab.get('tabId') or '')[:8]}", + "timestamp": tab.get("lastSendTime") or datetime.now().isoformat(), + "matchingText": matching_text, + "type": "chat", + }) + + except Exception as exc: + _logger.warning("Failed to search legacy workspace %s: %s", name, exc) + + except Exception as exc: + _logger.warning( + "Failed to iterate legacy workspaces under %s: %s", workspace_path, exc + ) + + return results + + +def search_cli_sessions( + cli_chats_path: str, + query: str, + query_lower: str, + rules: list, +) -> list[dict]: + """Search Cursor CLI agent sessions stored as JSONL + blob files. + + Reads from ``~/.cursor/chats/`` (or the path returned by + :func:`~utils.workspace_path.get_cli_chats_path`). + + Args: + cli_chats_path: Path to the Cursor CLI chats directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + rules: Parsed exclusion rules from app config. + + Returns: + List of search result dicts with ``type`` set to ``"cli_agent"`` and + ``source`` set to ``"cli"``. + """ + results: list[dict] = [] + try: + cli_projects = list_cli_projects(cli_chats_path) + for cp in cli_projects: + ws_name = cp["workspace_name"] or cp["project_id"][:12] + for session in cp["sessions"]: + meta = session.get("meta", {}) + session_id = session["session_id"] + created_ms: int = ( + meta.get("createdAt") or int(datetime.now().timestamp() * 1000) + ) + session_name: str = meta.get("name") or f"Session {session_id[:8]}" + + try: + messages = traverse_blobs(session["db_path"]) + except Exception as exc: + _logger.warning( + "Failed to traverse CLI session blobs for %s: %s", + session_id, + exc, + ) + continue + + bubbles = messages_to_bubbles(messages, created_ms) + if not bubbles: + continue + + title = session_name + if not title or title.startswith("New Agent"): + for b in bubbles: + if b["type"] == "user" and b.get("text"): + first_lines = [ + ln for ln in b["text"].split("\n") if ln.strip() + ] + if first_lines: + title = first_lines[0][:100] + break + + bubble_texts = [b["text"] for b in bubbles if b.get("text")] + tool_payloads = [ + tc.get("input") or tc.get("summary") or "" + for b in bubbles + for tc in (b.get("metadata") or {}).get("toolCalls") or [] + ] + exclusion_text = _build_exclusion_searchable( + project_name=ws_name, + chat_title=title, + content_parts=bubble_texts + tool_payloads, + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + title, bubble_texts, query_lower, query + ) + if not has_match: + continue + + results.append({ + "workspaceId": f"cli:{cp['project_id']}", + "workspaceFolder": cp.get("workspace_path"), + "chatId": session_id, + "chatTitle": title, + "timestamp": created_ms, + "matchingText": matching_text, + "type": "cli_agent", + "source": "cli", + }) + except Exception: + _logger.exception("Error searching CLI sessions") + + return results + + +# --------------------------------------------------------------------------- +# Aggregation +# --------------------------------------------------------------------------- + + +def rank_results(results: list[dict]) -> list[dict]: + """Sort *results* by timestamp descending. + + Handles both integer epoch-ms timestamps and ISO 8601 strings so the + three source types (composer, chat, cli_agent) sort together correctly. + """ + def _ts(r: dict) -> float: + t = r.get("timestamp", 0) + if isinstance(t, str): + try: + return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() + except Exception: + return 0.0 + return float(t) if t else 0.0 + + return sorted(results, key=_ts, reverse=True) diff --git a/tests/test_models_wired_at_read_sites.py b/tests/test_models_wired_at_read_sites.py index bdda6e0..d1428e6 100644 --- a/tests/test_models_wired_at_read_sites.py +++ b/tests/test_models_wired_at_read_sites.py @@ -98,7 +98,7 @@ def tearDown(self): def test_search_endpoint_calls_bubble_from_dict(self): from app import create_app - import api.search as search_mod + import services.search as search_mod app = create_app() app.config["TESTING"] = True app.config["EXCLUSION_RULES"] = [] @@ -149,7 +149,7 @@ def test_bubble_schema_drift_is_logged_not_swallowed_silently(self): app = create_app() app.config["TESTING"] = True app.config["EXCLUSION_RULES"] = [] - with self.assertLogs("api.search", level="WARNING") as logs: + with self.assertLogs("services.search", level="WARNING") as logs: client = app.test_client() response = client.get("/api/search?q=sentinel-wired") self.assertEqual(response.status_code, 200) diff --git a/tests/test_search_helpers.py b/tests/test_search_helpers.py new file mode 100644 index 0000000..f86a537 --- /dev/null +++ b/tests/test_search_helpers.py @@ -0,0 +1,472 @@ +""" +Unit tests for services/search.py — the three decomposed search functions +and shared helpers extracted from the monolithic api/search.py handler. + +Each test class targets a single extracted function so failures pinpoint +the exact data-source reader that broke, independently of the Flask layer. + +Run: + pytest tests/test_search_helpers.py -v +""" + +from __future__ import annotations + +import contextlib +import json +import os +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from models import ParseWarningCollector +from services.search import ( + _build_exclusion_searchable, + _extract_snippet, + _find_match, + rank_results, + search_cli_sessions, + search_global_storage, + search_legacy_workspaces, +) + + +# --------------------------------------------------------------------------- +# _extract_snippet +# --------------------------------------------------------------------------- + + +class TestExtractSnippet: + def test_match_at_start_no_leading_ellipsis(self): + text = "hello world foo" + snippet = _extract_snippet(text, "hello", "hello") + assert snippet.startswith("hello") + assert not snippet.startswith("...") + + def test_match_in_middle_adds_ellipsis(self): + padding = "x" * 200 + text = padding + "needle" + padding + snippet = _extract_snippet(text, "needle", "needle") + assert "needle" in snippet + assert snippet.startswith("...") + assert snippet.endswith("...") + + def test_no_match_returns_empty_string(self): + assert _extract_snippet("no match here", "xyz", "xyz") == "" + + def test_case_insensitive_query_lower(self): + text = "The Query appears here" + snippet = _extract_snippet(text, "Query", "query") + assert "Query" in snippet + + def test_snippet_length_is_bounded(self): + text = "a" * 1000 + "target" + "b" * 1000 + snippet = _extract_snippet(text, "target", "target") + # Context window: 80 before + len("target") + 120 after = ~206 chars + ellipses + assert len(snippet) < 300 + + +# --------------------------------------------------------------------------- +# _find_match +# --------------------------------------------------------------------------- + + +class TestFindMatch: + def test_title_match_returns_full_title(self): + has_match, text = _find_match("hello query world", [], "query", "query") + assert has_match + assert text == "hello query world" + + def test_bubble_match_returns_snippet(self): + has_match, text = _find_match( + "", + ["padding " * 20 + "needle" + " padding" * 20], + "needle", + "needle", + ) + assert has_match + assert "needle" in text + + def test_no_match_returns_false_and_empty(self): + has_match, text = _find_match("nothing here", ["also nothing"], "xyz", "xyz") + assert not has_match + assert text == "" + + def test_title_checked_before_bubbles(self): + # Both title and bubble contain the term; title should win. + has_match, text = _find_match( + "The query is in the title", + ["The query is also in bubbles"], + "query", + "query", + ) + assert has_match + assert text == "The query is in the title" + + def test_case_insensitive_title_match(self): + has_match, _ = _find_match("HELLO WORLD", [], "hello", "hello") + assert has_match + + def test_empty_title_and_empty_bubbles_no_match(self): + has_match, text = _find_match("", [], "q", "q") + assert not has_match + assert text == "" + + +# --------------------------------------------------------------------------- +# rank_results +# --------------------------------------------------------------------------- + + +class TestRankResults: + def test_sorted_by_timestamp_descending(self): + results = [ + {"timestamp": 1000}, + {"timestamp": 3000}, + {"timestamp": 2000}, + ] + ranked = rank_results(results) + assert [r["timestamp"] for r in ranked] == [3000, 2000, 1000] + + def test_iso_string_timestamps_sort_correctly(self): + results = [ + {"timestamp": "2024-01-01T00:00:00Z"}, + {"timestamp": "2025-01-01T00:00:00Z"}, + {"timestamp": "2023-01-01T00:00:00Z"}, + ] + ranked = rank_results(results) + assert ranked[0]["timestamp"] == "2025-01-01T00:00:00Z" + assert ranked[-1]["timestamp"] == "2023-01-01T00:00:00Z" + + def test_empty_list_returns_empty(self): + assert rank_results([]) == [] + + def test_missing_timestamp_treated_as_zero(self): + results = [{"timestamp": 500}, {}, {"timestamp": 100}] + ranked = rank_results(results) + assert ranked[0]["timestamp"] == 500 + # Missing timestamp entry sorts last + assert "timestamp" not in ranked[-1] + + +# --------------------------------------------------------------------------- +# Fixtures — minimal SQLite databases for integration-style unit tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_workspace_root(): + """Temporary workspaceStorage + globalStorage directory pair.""" + with tempfile.TemporaryDirectory() as tmp: + ws_root = os.path.join(tmp, "workspaceStorage") + global_root = os.path.join(tmp, "globalStorage") + cli_root = os.path.join(tmp, "cli_chats") + os.makedirs(ws_root, exist_ok=True) + os.makedirs(global_root, exist_ok=True) + os.makedirs(cli_root, exist_ok=True) + yield { + "ws_root": ws_root, + "global_root": global_root, + "cli_root": cli_root, + "tmp": tmp, + } + + +def _make_global_db(global_root: str, composer_id: str, bubble_text: str) -> None: + """Seed globalStorage/state.vscdb with one composer + one bubble.""" + db_path = os.path.join(global_root, "state.vscdb") + with contextlib.closing(sqlite3.connect(db_path)) as conn: + conn.execute("CREATE TABLE cursorDiskKV ([key] TEXT PRIMARY KEY, value TEXT)") + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + f"bubbleId:{composer_id}:bub-1", + json.dumps({"type": "user", "text": bubble_text}), + ), + ) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + f"composerData:{composer_id}", + json.dumps({ + "name": "Test conversation", + "createdAt": 1_715_000_000_000, + "lastUpdatedAt": 1_715_001_000_000, + "fullConversationHeadersOnly": [{"bubbleId": "bub-1"}], + "modelConfig": {"modelName": "gpt-4o"}, + }), + ), + ) + conn.commit() + + +def _make_workspace_db( + ws_root: str, + workspace_id: str, + composer_id: str, + folder: str, + legacy_chat_text: str | None = None, +) -> None: + """Seed a per-workspace state.vscdb + workspace.json.""" + ws_dir = os.path.join(ws_root, workspace_id) + os.makedirs(ws_dir, exist_ok=True) + with open(os.path.join(ws_dir, "workspace.json"), "w", encoding="utf-8") as fh: + json.dump({"folder": folder}, fh) + db_path = os.path.join(ws_dir, "state.vscdb") + with contextlib.closing(sqlite3.connect(db_path)) as conn: + conn.execute("CREATE TABLE ItemTable ([key] TEXT PRIMARY KEY, value TEXT)") + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "composer.composerData", + json.dumps({"allComposers": [{"composerId": composer_id}]}), + ), + ) + if legacy_chat_text is not None: + legacy_data = { + "tabs": [{ + "tabId": "tab-legacy-1", + "chatTitle": "Legacy chat", + "lastSendTime": "2026-01-01T00:00:00Z", + "bubbles": [{"type": "user", "text": legacy_chat_text}], + }] + } + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "workbench.panel.aichat.view.aichat.chatdata", + json.dumps(legacy_data), + ), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# search_global_storage +# --------------------------------------------------------------------------- + + +class TestSearchGlobalStorage: + def test_returns_matching_composer(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-1", "unique-search-term-gs") + _make_workspace_db(dirs["ws_root"], "ws-gs-1", "cmp-gs-1", "/projects/myapp") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="unique-search-term-gs", + query_lower="unique-search-term-gs", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert len(results) >= 1 + assert any(r["chatId"] == "cmp-gs-1" for r in results) + + def test_no_match_returns_empty_list(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-2", "some other content") + _make_workspace_db(dirs["ws_root"], "ws-gs-2", "cmp-gs-2", "/projects/other") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="xyzzy-no-match-ever", + query_lower="xyzzy-no-match-ever", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results == [] + + def test_result_has_required_keys(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-3", "search-key-check") + _make_workspace_db(dirs["ws_root"], "ws-gs-3", "cmp-gs-3", "/projects/keys") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="search-key-check", + query_lower="search-key-check", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results + r = results[0] + for key in ("workspaceId", "chatId", "chatTitle", "timestamp", "matchingText", "type"): + assert key in r, f"missing key: {key}" + assert r["type"] == "composer" + assert isinstance(r["timestamp"], int) + + def test_missing_global_db_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + # No global DB created — directory exists but state.vscdb absent. + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="anything", + query_lower="anything", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + assert results == [] + + def test_workspace_display_name_resolved(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-4", "name-check-term") + _make_workspace_db( + dirs["ws_root"], "ws-gs-4", "cmp-gs-4", "file:///home/user/projects/myrepo" + ) + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="name-check-term", + query_lower="name-check-term", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results + # Workspace folder name is resolved to the basename of the folder path. + assert results[0]["workspaceFolder"] == "myrepo" + + +# --------------------------------------------------------------------------- +# search_legacy_workspaces +# --------------------------------------------------------------------------- + + +class TestSearchLegacyWorkspaces: + def test_returns_matching_legacy_tab(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-1", + "cmp-leg-1", + "/projects/legacyapp", + legacy_chat_text="legacy-unique-search-text", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="legacy-unique-search-text", + query_lower="legacy-unique-search-text", + search_type="all", + rules=[], + ) + + assert len(results) >= 1 + assert any(r.get("type") == "chat" for r in results) + + def test_no_match_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-2", + "cmp-leg-2", + "/projects/other", + legacy_chat_text="something else entirely", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="xyzzy-absolutely-no-match", + query_lower="xyzzy-absolutely-no-match", + search_type="all", + rules=[], + ) + + assert results == [] + + def test_search_type_composer_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-3", + "cmp-leg-3", + "/projects/skip", + legacy_chat_text="type-guard-term", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="type-guard-term", + query_lower="type-guard-term", + search_type="composer", + rules=[], + ) + + # Legacy workspaces only hold chat (type="chat"); composer search skips them. + assert results == [] + + def test_result_has_required_keys(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-4", + "cmp-leg-4", + "/projects/keycheck", + legacy_chat_text="key-check-legacy", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="key-check-legacy", + query_lower="key-check-legacy", + search_type="chat", + rules=[], + ) + + assert results + r = results[0] + for key in ("workspaceId", "chatId", "chatTitle", "timestamp", "matchingText", "type"): + assert key in r, f"missing key: {key}" + assert r["type"] == "chat" + + def test_workspace_without_legacy_data_skipped(self, tmp_workspace_root): + dirs = tmp_workspace_root + # Workspace DB exists but has no chatdata key (modern workspaces). + _make_workspace_db( + dirs["ws_root"], + "ws-leg-5", + "cmp-leg-5", + "/projects/modern", + legacy_chat_text=None, # no legacy chatdata row + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="anything", + query_lower="anything", + search_type="all", + rules=[], + ) + + assert results == [] + + +# --------------------------------------------------------------------------- +# search_cli_sessions +# --------------------------------------------------------------------------- + + +class TestSearchCliSessions: + def test_empty_cli_dir_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + # cli_root is empty — no projects, no sessions. + results = search_cli_sessions( + cli_chats_path=dirs["cli_root"], + query="anything", + query_lower="anything", + rules=[], + ) + assert results == [] + + def test_nonexistent_cli_dir_returns_empty(self): + results = search_cli_sessions( + cli_chats_path="/nonexistent/path/that/does/not/exist", + query="anything", + query_lower="anything", + rules=[], + ) + assert results == [] From 4488963d825f41ed78abec52ee04acc0c47a5161 Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 13:57:57 -0400 Subject: [PATCH 2/6] fix: typecheck error --- services/search.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/search.py b/services/search.py index a54d0d5..69f6ff1 100644 --- a/services/search.py +++ b/services/search.py @@ -263,6 +263,8 @@ def search_global_storage( bubble_meta: list[str] = [] for header in headers: bid = header.get("bubbleId") + if not bid: + continue entry = bubble_map.get(bid) if not entry: continue From d08ca431c9b37c4296ec24dc1fc67c178260f88e Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 14:14:00 -0400 Subject: [PATCH 3/6] fix: add test for search helpers --- tests/test_search_helpers.py | 105 +++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/tests/test_search_helpers.py b/tests/test_search_helpers.py index f86a537..cd6fcf2 100644 --- a/tests/test_search_helpers.py +++ b/tests/test_search_helpers.py @@ -445,6 +445,35 @@ def test_workspace_without_legacy_data_skipped(self, tmp_workspace_root): assert results == [] +# --------------------------------------------------------------------------- +# CLI session fixture helper +# --------------------------------------------------------------------------- + + +def _make_store_db(path: str, meta: dict, json_blobs: dict[str, dict]) -> None: + """Create a minimal ``store.db`` with *meta* and one or more JSON blobs. + + The meta value is hex-encoded JSON, matching the real Cursor CLI format + (see ``utils/cli_chat_reader._read_meta`` and ``traverse_blobs``). + Blob IDs are arbitrary strings; no chain/binary blobs are needed for a + single-message session since ``traverse_blobs`` collects the root blob + directly when it is a JSON blob. + """ + with contextlib.closing(sqlite3.connect(path)) as conn: + conn.execute("CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT)") + conn.execute("CREATE TABLE blobs (id TEXT PRIMARY KEY, data BLOB)") + conn.execute( + "INSERT INTO meta VALUES ('0', ?)", + (json.dumps(meta).encode("utf-8").hex(),), + ) + for blob_id, msg in json_blobs.items(): + conn.execute( + "INSERT INTO blobs VALUES (?, ?)", + (blob_id, json.dumps(msg).encode("utf-8")), + ) + conn.commit() + + # --------------------------------------------------------------------------- # search_cli_sessions # --------------------------------------------------------------------------- @@ -470,3 +499,79 @@ def test_nonexistent_cli_dir_returns_empty(self): rules=[], ) assert results == [] + + def test_seeded_session_found_by_content_match(self, tmp_workspace_root): + """Seed a real store.db session and verify search_cli_sessions finds it. + + Directory layout mirrors the real Cursor CLI storage: + cli_root/{project_id}/{session_id}/store.db + + The store.db contains: + - ``meta`` row: hex-encoded JSON with ``latestRootBlobId`` pointing + to the single user-message blob. + - ``blobs`` row: JSON bytes ``{"role": "user", "content": ""}`` + where ```` is the unique query we search for. + """ + dirs = tmp_workspace_root + cli_root = dirs["cli_root"] + project_id = "proj-cli-test" + session_id = "sess-cli-test" + blob_id = "blob-msg-0001" + search_term = "cli-session-unique-sentinel-xyz" + + session_dir = os.path.join(cli_root, project_id, session_id) + os.makedirs(session_dir, exist_ok=True) + + _make_store_db( + path=os.path.join(session_dir, "store.db"), + meta={ + "latestRootBlobId": blob_id, + "name": "CLI search test session", + "createdAt": 1_715_100_000_000, + }, + json_blobs={ + blob_id: {"role": "user", "content": f"Please help me with {search_term}"}, + }, + ) + + results = search_cli_sessions( + cli_chats_path=cli_root, + query=search_term, + query_lower=search_term, + rules=[], + ) + + assert len(results) >= 1 + hit = next((r for r in results if r["chatId"] == session_id), None) + assert hit is not None, f"session {session_id!r} not in results: {results}" + assert hit["type"] == "cli_agent" + assert hit["source"] == "cli" + assert search_term in hit["matchingText"] + + def test_seeded_session_not_returned_when_query_misses(self, tmp_workspace_root): + """Same store.db fixture; a non-matching query must return empty.""" + dirs = tmp_workspace_root + cli_root = dirs["cli_root"] + project_id = "proj-cli-miss" + session_id = "sess-cli-miss" + blob_id = "blob-msg-miss" + + session_dir = os.path.join(cli_root, project_id, session_id) + os.makedirs(session_dir, exist_ok=True) + + _make_store_db( + path=os.path.join(session_dir, "store.db"), + meta={"latestRootBlobId": blob_id, "name": "Miss session", "createdAt": 0}, + json_blobs={ + blob_id: {"role": "user", "content": "completely unrelated content"}, + }, + ) + + results = search_cli_sessions( + cli_chats_path=cli_root, + query="xyzzy-no-match-cli", + query_lower="xyzzy-no-match-cli", + rules=[], + ) + + assert results == [] From dd5a01b20b0416f65ad5620d8d9382935442165b Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 17:01:01 -0400 Subject: [PATCH 4/6] fix; review findings --- .github/workflows/tests.yml | 2 +- services/search.py | 20 ++++++++++++++------ tests/test_search_helpers.py | 16 +++++++++++++++- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5b97bdc..8f642b2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -114,7 +114,7 @@ jobs: # Pytest fixtures (tests/conftest.py) build a temp workspaceStorage and # exercise Flask routes via app.test_client(). Only listed files — not # `pytest tests/` — to avoid re-collecting unittest.TestCase classes above. - run: python -m pytest tests/test_api_endpoints.py tests/test_pdf_export.py -v --tb=short + run: python -m pytest tests/test_api_endpoints.py tests/test_pdf_export.py tests/test_search_helpers.py -v --tb=short # ── PyInstaller desktop build (Windows only, once per workflow) ──────── # Closes #44. Builds the onedir bundle and smoke-tests --help so the diff --git a/services/search.py b/services/search.py index 69f6ff1..fcbc699 100644 --- a/services/search.py +++ b/services/search.py @@ -25,6 +25,12 @@ from datetime import datetime from pathlib import Path +__all__ = [ + "search_global_storage", + "search_legacy_workspaces", + "search_cli_sessions", + "rank_results", +] from models import Bubble, Composer, ParseWarningCollector, SchemaError from services.workspace_db import ( build_composer_id_to_workspace_id, @@ -120,7 +126,6 @@ def _find_match( def _build_ws_id_to_name( - workspace_path: str, workspace_entries: list[dict], ) -> dict[str, str]: """Map workspace folder IDs to human-readable display names. @@ -204,7 +209,7 @@ def search_global_storage( results: list[dict] = [] try: workspace_entries = collect_workspace_entries(workspace_path) - ws_id_to_name = _build_ws_id_to_name(workspace_path, workspace_entries) + ws_id_to_name = _build_ws_id_to_name(workspace_entries) composer_id_to_ws = build_composer_id_to_workspace_id( workspace_path, workspace_entries ) @@ -435,7 +440,7 @@ def search_legacy_workspaces( "workspaceFolder": workspace_folder, "chatId": tab.get("tabId"), "chatTitle": ct or f"Chat {(tab.get('tabId') or '')[:8]}", - "timestamp": tab.get("lastSendTime") or datetime.now().isoformat(), + "timestamp": tab.get("lastSendTime") or 0, "matchingText": matching_text, "type": "chat", }) @@ -554,14 +559,17 @@ def search_cli_sessions( def rank_results(results: list[dict]) -> list[dict]: """Sort *results* by timestamp descending. - Handles both integer epoch-ms timestamps and ISO 8601 strings so the - three source types (composer, chat, cli_agent) sort together correctly. + All three source types use epoch-millisecond integers, except + ``search_legacy_workspaces`` which may emit ISO 8601 strings for the + ``lastSendTime`` field. ISO strings are converted to epoch-ms so + cross-source comparisons are made in the same unit. """ def _ts(r: dict) -> float: t = r.get("timestamp", 0) if isinstance(t, str): try: - return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() + # .timestamp() → epoch-seconds; ×1000 → epoch-ms to match ints + return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() * 1000 except Exception: return 0.0 return float(t) if t else 0.0 diff --git a/tests/test_search_helpers.py b/tests/test_search_helpers.py index cd6fcf2..6826640 100644 --- a/tests/test_search_helpers.py +++ b/tests/test_search_helpers.py @@ -22,7 +22,6 @@ from models import ParseWarningCollector from services.search import ( - _build_exclusion_searchable, _extract_snippet, _find_match, rank_results, @@ -149,6 +148,21 @@ def test_missing_timestamp_treated_as_zero(self): # Missing timestamp entry sorts last assert "timestamp" not in ranked[-1] + def test_mixed_epoch_ms_and_iso_string_sort_by_recency(self): + # composer/CLI results use integer epoch-ms (~1.715e12); + # legacy chat results may carry an ISO string from lastSendTime. + # A chat from 2025-01 must rank above a composer from 2024-05 when + # both are in the same result set. + results = [ + {"timestamp": 1_715_000_000_000, "type": "composer"}, # 2024-05 + {"timestamp": "2025-01-01T00:00:00Z", "type": "chat"}, # 2025-01 + ] + ranked = rank_results(results) + assert ranked[0]["type"] == "chat", ( + "2025-01 chat must outrank 2024-05 composer; " + f"got order: {[r['type'] for r in ranked]}" + ) + # --------------------------------------------------------------------------- # Fixtures — minimal SQLite databases for integration-style unit tests From 5ce7702e7b612407f9e4a2026bf2859706a3c975 Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 18:03:46 -0400 Subject: [PATCH 5/6] fix: nitpack and outside diff comments --- models/parse_warnings.py | 19 ++++++++++++++++++- services/search.py | 9 +++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/models/parse_warnings.py b/models/parse_warnings.py index 15386bd..ef167d7 100644 --- a/models/parse_warnings.py +++ b/models/parse_warnings.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field @dataclass @@ -10,6 +10,7 @@ class ParseWarningCollector: composers_skipped: int = 0 bubbles_skipped: int = 0 composers_processing_failed: int = 0 + source_failures: list[dict] = field(default_factory=list) def record_composer_skipped(self, count: int = 1) -> None: if count > 0: @@ -24,12 +25,22 @@ def record_composer_processing_failure(self, count: int = 1) -> None: if count > 0: self.composers_processing_failed += count + def record_source_failure(self, exc: BaseException, source: str) -> None: + """Record a whole-source failure (e.g. the global storage DB is unreadable). + + Distinct from per-item parse skips: signals that an entire data source + could not be searched so the API can warn callers that results may be + incomplete. + """ + self.source_failures.append({"source": source, "detail": str(exc)}) + @property def has_warnings(self) -> bool: return ( self.composers_skipped > 0 or self.bubbles_skipped > 0 or self.composers_processing_failed > 0 + or bool(self.source_failures) ) def to_api_list(self) -> list[dict]: @@ -65,6 +76,12 @@ def to_api_list(self) -> list[dict]: f"{n} {noun} could not be fully assembled after parsing" ), }) + for sf in self.source_failures: + warnings.append({ + "type": "source_failure", + "source": sf["source"], + "detail": sf["detail"], + }) return warnings def attach_to(self, payload: dict) -> dict: diff --git a/services/search.py b/services/search.py index fcbc699..b3209a7 100644 --- a/services/search.py +++ b/services/search.py @@ -26,10 +26,10 @@ from pathlib import Path __all__ = [ + "rank_results", + "search_cli_sessions", "search_global_storage", "search_legacy_workspaces", - "search_cli_sessions", - "rank_results", ] from models import Bubble, Composer, ParseWarningCollector, SchemaError from services.workspace_db import ( @@ -334,8 +334,9 @@ def search_global_storage( ) parse_warnings.record_composer_processing_failure() - except Exception: + except Exception as exc: _logger.exception("Error searching global storage") + parse_warnings.record_source_failure(exc, source="global_storage") return results @@ -568,7 +569,7 @@ def _ts(r: dict) -> float: t = r.get("timestamp", 0) if isinstance(t, str): try: - # .timestamp() → epoch-seconds; ×1000 → epoch-ms to match ints + # .timestamp() -> epoch-seconds; x1000 -> epoch-ms to match ints return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() * 1000 except Exception: return 0.0 From dbb094db61d266ecde7a36c31227d272345b1454 Mon Sep 17 00:00:00 2001 From: bradjin8 Date: Wed, 10 Jun 2026 18:09:00 -0400 Subject: [PATCH 6/6] fix: Do not expose raw exception text in API warnings. --- models/parse_warnings.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/models/parse_warnings.py b/models/parse_warnings.py index ef167d7..bcfe802 100644 --- a/models/parse_warnings.py +++ b/models/parse_warnings.py @@ -31,8 +31,13 @@ def record_source_failure(self, exc: BaseException, source: str) -> None: Distinct from per-item parse skips: signals that an entire data source could not be searched so the API can warn callers that results may be incomplete. + + The raw exception is intentionally not stored — it is logged server-side + by the caller (``_logger.exception``) before this method is invoked. + Only the source identifier is retained so ``to_api_list`` can produce a + safe client message without leaking file paths or Python internals. """ - self.source_failures.append({"source": source, "detail": str(exc)}) + self.source_failures.append({"source": source}) @property def has_warnings(self) -> bool: @@ -80,7 +85,7 @@ def to_api_list(self) -> list[dict]: warnings.append({ "type": "source_failure", "source": sf["source"], - "detail": sf["detail"], + "detail": f"Search source '{sf['source']}' could not be queried; results may be incomplete", }) return warnings