diff --git a/src/google/adk/environment/_local_environment.py b/src/google/adk/environment/_local_environment.py index c58da7f0d9..4695ef6f52 100644 --- a/src/google/adk/environment/_local_environment.py +++ b/src/google/adk/environment/_local_environment.py @@ -139,11 +139,16 @@ async def write_file(self, path: str | Path, content: str | bytes) -> None: return await asyncio.to_thread(self._sync_write, resolved, content) def _resolve_path(self, path: str | Path) -> str: - """Resolve a relative path against the working directory.""" - path = str(path) - if os.path.isabs(path): - return path - return os.path.join(self._working_dir, path) + """Resolve a file path inside the working directory.""" + candidate = Path(path) + working_dir = self.working_dir.resolve() + if not candidate.is_absolute(): + candidate = working_dir / candidate + + resolved = candidate.resolve() + if not resolved.is_relative_to(working_dir): + raise ValueError(f'Path escapes working directory: {path}') + return str(resolved) @staticmethod def _sync_read(path: str) -> bytes: diff --git a/tests/unittests/tools/test_local_environment.py b/tests/unittests/tools/test_local_environment.py index 0c1b52d380..4f1367b72a 100644 --- a/tests/unittests/tools/test_local_environment.py +++ b/tests/unittests/tools/test_local_environment.py @@ -76,6 +76,39 @@ async def test_write_creates_parent_dirs(self, env: LocalEnvironment): data = await env.read_file("sub/dir/file.txt") assert data == b"nested" + @pytest.mark.asyncio + async def test_absolute_path_inside_working_dir(self, env: LocalEnvironment): + """Absolute paths are accepted when they stay inside the workspace.""" + path = env.working_dir / "absolute.txt" + await env.write_file(path, "absolute") + data = await env.read_file(path) + assert data == b"absolute" + + @pytest.mark.asyncio + async def test_rejects_relative_path_escape(self, env: LocalEnvironment): + """Parent traversal cannot escape the workspace.""" + outside = env.working_dir.parent / "outside.txt" + outside.write_text("secret", encoding="utf-8") + + with pytest.raises(ValueError, match="escapes working directory"): + await env.read_file(Path("..") / outside.name) + + with pytest.raises(ValueError, match="escapes working directory"): + await env.write_file(Path("..") / "write-outside.txt", "nope") + + assert not (env.working_dir.parent / "write-outside.txt").exists() + + @pytest.mark.asyncio + async def test_rejects_absolute_path_outside_working_dir( + self, env: LocalEnvironment + ): + """Absolute paths outside the workspace are rejected.""" + outside = env.working_dir.parent / "outside-absolute.txt" + outside.write_text("secret", encoding="utf-8") + + with pytest.raises(ValueError, match="escapes working directory"): + await env.read_file(outside) + @pytest.mark.asyncio async def test_read_nonexistent_raises(self, env: LocalEnvironment): """Reading a missing file raises FileNotFoundError."""