Skip to content
112 changes: 108 additions & 4 deletions src/specify_cli/integrations/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __init__(self, key: str, project_root: Path, version: str = "") -> None:
self.project_root = project_root.resolve()
self.version = version
self._files: dict[str, str] = {} # rel_path → sha256 hex
self._recovered_files: set[str] = set()
self._installed_at: str = ""

# -- Manifest file location -------------------------------------------
Expand All @@ -131,6 +132,9 @@ def record_file(self, rel_path: str | Path, content: bytes | str) -> Path:

Creates parent directories as needed. Returns the absolute path
of the written file.
If the path was previously marked as recovered via
``record_existing(recovered=True)``, the recovered marker is
cleared because the bytes are now produced, not merely observed.

Raises ``ValueError`` if *rel_path* resolves outside the project root.
"""
Expand All @@ -144,17 +148,71 @@ def record_file(self, rel_path: str | Path, content: bytes | str) -> Path:

normalized = abs_path.relative_to(self.project_root).as_posix()
self._files[normalized] = hashlib.sha256(content).hexdigest()
# ``record_file`` writes *produced* content, so any prior
# recovered marker for this path is no longer accurate.
self._recovered_files.discard(normalized)
return abs_path

def record_existing(self, rel_path: str | Path) -> None:
"""Record the hash of an already-existing file at *rel_path*.

Raises ``ValueError`` if *rel_path* resolves outside the project root.
def record_existing(self, rel_path: str | Path, *, recovered: bool = False) -> None:
"""Record the hash of an already-existing regular file at *rel_path*.

When ``recovered=True``, the path is also marked in the manifest's
``recovered_files`` list to signal that the file's on-disk hash was
*observed* during install (because the file already existed and was not
overwritten), not *produced* by the install. Future ``refresh_managed``
runs should consult ``is_recovered`` before treating the recorded hash
as a managed baseline.

Raises:
ValueError: if *rel_path* resolves outside the project root, is
a symlink, or is not a regular file. A directory or other
non-file path cannot be silently recorded — its hash would
be meaningless and ``check_modified``/``uninstall`` would
treat the entry as permanently broken.
"""
rel = Path(rel_path)
# Cheap lexical pre-check first so absolute / parent-traversal paths
# don't trigger a filesystem stat outside the project root before
# ``_validate_rel_path`` raises. ``_validate_rel_path`` produces the
# canonical error messages used elsewhere.
if rel.is_absolute() or ".." in rel.parts:
_validate_rel_path(rel, self.project_root)
# _validate_rel_path raised for any actually-escaping path. If we reach
# here the path normalizes inside root (e.g. ``dir/../file.txt``).
# Reject anyway: manifest keys must be canonical so ``check_modified``
# and ``uninstall`` cannot key the same file under two paths.
raise ValueError(
f"Manifest paths must be canonical; '..' segments are not "
f"allowed (got {rel})"
)
# Walk each path component before resolution so a symlinked ancestor
# (e.g. ``linked_dir/file.txt`` where ``linked_dir`` is a symlink)
# cannot be silently followed by ``_validate_rel_path().resolve()``
# down to a target outside the project root. ``_ensure_safe_manifest_directory``
# uses the same pattern.
_walk = self.project_root
for part in rel.parts:
_walk = _walk / part
if _walk.is_symlink():
raise ValueError(
f"Refusing to record symlinked manifest path: {rel} "
f"(symlinked at {_walk.relative_to(self.project_root).as_posix()})"
)
abs_path = _validate_rel_path(rel, self.project_root)
Comment thread
mnriem marked this conversation as resolved.
Comment thread
mnriem marked this conversation as resolved.
if not abs_path.is_file():
raise ValueError(
f"Manifest path is not a regular file: {rel}"
)
normalized = abs_path.relative_to(self.project_root).as_posix()
self._files[normalized] = _sha256(abs_path)
if recovered:
self._recovered_files.add(normalized)
else:
# ``recovered=False`` means the caller is asserting this path is
# managed-baseline now, not merely observed; drop any stale
# recovered marker so future is_recovered() queries reflect the
# transition. ``discard`` is a no-op when the key is absent.
self._recovered_files.discard(normalized)

# -- Querying ---------------------------------------------------------

Expand All @@ -163,6 +221,37 @@ def files(self) -> dict[str, str]:
"""Return a copy of the ``{rel_path: sha256}`` mapping."""
return dict(self._files)

@property
def recovered_files(self) -> set[str]:
"""Return a copy of the set of paths recorded with ``recovered=True``.

These entries had their hashes observed (not produced) during install
because the file already existed on disk and the install skipped it.
Their on-disk bytes may be user customizations — callers that would
overwrite based on hash equality (e.g. ``refresh_managed``) MUST check
``is_recovered`` first.
"""
return set(self._recovered_files)

def is_recovered(self, rel_path: str | Path) -> bool:
"""Return True if *rel_path* was recorded via ``record_existing(recovered=True)``.

Input is normalized through the same pipeline as ``record_existing``:
absolute paths, paths escaping the project root, AND paths containing
``'..'`` segments are rejected (returned as ``False``). This mirrors
``record_existing``'s canonicalization guard — such paths can never
appear as stored keys, so the answer is always ``False``.
"""
rel = Path(rel_path)
if rel.is_absolute() or ".." in rel.parts:
return False
try:
abs_path = _validate_rel_path(rel, self.project_root)
normalized = abs_path.relative_to(self.project_root).as_posix()
except ValueError:
return False
return normalized in self._recovered_files

def check_modified(self) -> list[str]:
"""Return relative paths of tracked files whose content changed on disk."""
modified: list[str] = []
Expand Down Expand Up @@ -269,6 +358,11 @@ def save(self) -> Path:
"version": self.version,
"installed_at": self._installed_at,
"files": self._files,
**(
{"recovered_files": sorted(self._recovered_files)}
if self._recovered_files
else {}
),
}
path = self.manifest_path
content = json.dumps(data, indent=2) + "\n"
Expand Down Expand Up @@ -320,6 +414,16 @@ def load(cls, key: str, project_root: Path) -> IntegrationManifest:
inst._installed_at = data.get("installed_at", "")
inst._files = files

recovered = data.get("recovered_files", [])
if not isinstance(recovered, list) or not all(
isinstance(p, str) for p in recovered
):
raise ValueError(
Comment on lines +417 to +421
f"Integration manifest 'recovered_files' at {path} must be a "
"list of string paths"
)
inst._recovered_files = set(recovered)

stored_key = data.get("integration", "")
if stored_key and stored_key != key:
raise ValueError(
Expand Down
36 changes: 35 additions & 1 deletion src/specify_cli/shared_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,23 @@ def _ensure_or_bucket_dir(directory: Path) -> bool:
preserved_user_files.append(rel)
else:
skipped_files.append(rel)
# Record the existing-on-disk file in the manifest so a
# fresh manifest run against an already-populated
# ``.specify/`` tree does not silently drop it (#2107).
# ``prior_hashes`` is the function-scope snapshot taken
# at entry, so this membership check is O(1) and avoids
# the repeated ``dict(self._files)`` copy that
# ``manifest.files`` performs on every access.
if dst_path.is_file() and rel not in prior_hashes:
try:
manifest.record_existing(rel, recovered=True)
except (OSError, ValueError) as exc:
# Tolerate races / permission issues / non-file
# collisions so one weird path does not abort
# the whole install.
console.print(
f"[yellow]⚠[/yellow] could not record {rel} in manifest: {exc}"
)
continue

if not _ensure_or_bucket_dir(dst_path.parent):
Expand All @@ -383,6 +400,23 @@ def _ensure_or_bucket_dir(directory: Path) -> bool:
preserved_user_files.append(rel)
else:
skipped_files.append(rel)
# Record the existing-on-disk template in the manifest so a
# fresh manifest run against an already-populated
# ``.specify/`` tree does not silently drop it (#2107).
# ``prior_hashes`` is the function-scope snapshot taken at
# entry, so this membership check is O(1) and avoids the
# repeated ``dict(self._files)`` copy that ``manifest.files``
# performs on every access.
if dst.is_file() and rel not in prior_hashes:
try:
manifest.record_existing(rel, recovered=True)
except (OSError, ValueError) as exc:
# Tolerate races / permission issues / non-file
# collisions so one weird path does not abort
# the whole install.
console.print(
f"[yellow]⚠[/yellow] could not record {rel} in manifest: {exc}"
)
continue

content = src.read_text(encoding="utf-8")
Expand All @@ -401,7 +435,7 @@ def _ensure_or_bucket_dir(directory: Path) -> bool:

if skipped_files:
console.print(
f"[yellow]⚠[/yellow] {len(skipped_files)} shared infrastructure file(s) already exist and were not updated:"
f"[yellow]⚠[/yellow] {len(skipped_files)} shared infrastructure path(s) already exist and were not updated:"
)
for path in skipped_files:
console.print(f" {path}")
Expand Down
Loading