diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 214f9f9e..723e0b23 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -17,9 +17,10 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any, Literal -from urllib.parse import quote, urlparse +from urllib.parse import quote, urljoin, urlparse import httpx +import idna from pydantic import Field from adcp.exceptions import ( @@ -28,6 +29,7 @@ AdagentsTimeoutError, AdagentsValidationError, ) +from adcp.signing.etld import same_registrable_domain from adcp.types.base import AdCPBaseModel from adcp.validation import ValidationError, validate_adagents @@ -373,6 +375,40 @@ def _validate_redirect_url(url: str) -> None: _check_safe_host(parsed.hostname or "", "authoritative_location") +def _idna_ascii_host(hostname: str, context: str) -> str: + """Normalize a hostname to IDNA ASCII for eTLD+1 comparisons.""" + try: + return idna.encode(hostname.rstrip("."), uts46=True).decode("ascii").lower() + except idna.IDNAError as e: + raise AdagentsValidationError(f"{context} has invalid IDNA hostname: {hostname!r}") from e + + +def _resolve_well_known_redirect_url( + *, + current_url: str, + location: str | None, + original_hostname: str, +) -> str: + if not location: + raise AdagentsValidationError("adagents.json redirect missing Location header") + + next_url = urljoin(current_url, location) + parsed = urlparse(next_url) + if parsed.scheme != "https": + raise AdagentsValidationError(f"adagents.json redirect must be an HTTPS URL: {next_url!r}") + next_hostname = parsed.hostname or "" + _check_safe_host(next_hostname, "adagents.json redirect") + + original = _idna_ascii_host(original_hostname, "publisher_domain") + target = _idna_ascii_host(next_hostname, "adagents.json redirect") + if not same_registrable_domain(original, target): + raise AdagentsValidationError( + "adagents.json redirect must stay within the original registrable domain" + ) + + return next_url + + def normalize_url(url: str) -> str: """Normalize URL by removing protocol and trailing slash. @@ -567,6 +603,11 @@ def verify_agent_authorization( # Maximum number of authoritative_location redirects to follow MAX_REDIRECT_DEPTH = 5 +# Maximum number of HTTP redirects to follow for the initial +# /.well-known/adagents.json fetch. These redirects are not delegation; they +# only cover same-site hosting normalization such as apex -> www. +MAX_WELL_KNOWN_REDIRECT_HOPS = 3 + # Maximum size of a publisher's ads.txt file. IAB practice caps real # ads.txt files in the low MB range; this gives plenty of headroom while # preventing a hostile publisher from forcing the SDK to buffer an @@ -661,14 +702,25 @@ async def _resolve_direct( hop_cache = cache_entry if not is_redirect else None try: - data, etag, last_modified, not_modified = await _fetch_adagents_url( - url, - timeout, - user_agent, - fetch_client, - max_bytes=max_bytes, - cache_entry=hop_cache, - ) + if is_redirect: + data, etag, last_modified, not_modified = await _fetch_adagents_url( + url, + timeout, + user_agent, + fetch_client, + max_bytes=max_bytes, + cache_entry=hop_cache, + ) + else: + data, etag, last_modified, not_modified = await _fetch_well_known_adagents_url( + url, + timeout, + user_agent, + fetch_client, + original_hostname=publisher_domain, + max_bytes=max_bytes, + cache_entry=hop_cache, + ) except AdagentsNotFoundError: # A 404 on a followed authoritative_location target is a broken # redirect chain, not a missing publisher manifest. Surface it as @@ -1087,13 +1139,84 @@ async def _fetch_adagents_url( :data:`MAX_AUTHORITATIVE_BYTES` for dereferenced authoritative files per adcp#4504). """ + headers = _adagents_headers(user_agent, cache_entry) + body, status_code, response_headers = await _fetch_adagents_response( + url, timeout, headers, client, max_bytes + ) + return _parse_adagents_response(url, body, status_code, response_headers, cache_entry) + + +async def _fetch_well_known_adagents_url( + url: str, + timeout: float, + user_agent: str, + client: httpx.AsyncClient | None, + *, + original_hostname: str, + max_bytes: int = MAX_POINTER_BYTES, + cache_entry: AdagentsCacheEntry | None = None, +) -> tuple[dict[str, Any], str | None, str | None, bool]: + """Fetch the initial well-known URL, following safe same-site HTTP redirects.""" + headers = _adagents_headers(user_agent, cache_entry) + current_url = url + current_client = client + current_cache = cache_entry + visited_urls: set[str] = set() + + for hop in range(MAX_WELL_KNOWN_REDIRECT_HOPS + 1): + if current_url in visited_urls: + raise AdagentsValidationError("Circular redirect detected in adagents.json fetch") + visited_urls.add(current_url) + + body, status_code, response_headers = await _fetch_adagents_response( + current_url, + timeout, + headers, + current_client, + max_bytes, + ) + if status_code not in {301, 302, 303, 307, 308}: + return _parse_adagents_response( + current_url, body, status_code, response_headers, current_cache + ) + + if hop >= MAX_WELL_KNOWN_REDIRECT_HOPS: + raise AdagentsValidationError( + f"Maximum well-known redirect hops ({MAX_WELL_KNOWN_REDIRECT_HOPS}) exceeded" + ) + + current_url = _resolve_well_known_redirect_url( + current_url=current_url, + location=response_headers.get("location"), + original_hostname=original_hostname, + ) + # Only the publisher's first URL may use the caller's client and + # conditional validators. Every HTTP redirect hop gets a fresh + # SDK-owned pinned client inside _fetch_adagents_response. + current_client = None + current_cache = None + headers = _adagents_headers(user_agent, None) + + raise AssertionError("Unreachable") # pragma: no cover + + +def _adagents_headers(user_agent: str, cache_entry: AdagentsCacheEntry | None) -> dict[str, str]: headers: dict[str, str] = {"User-Agent": user_agent} if cache_entry is not None: if cache_entry.etag: headers["If-None-Match"] = cache_entry.etag if cache_entry.last_modified: headers["If-Modified-Since"] = cache_entry.last_modified + return headers + +async def _fetch_adagents_response( + url: str, + timeout: float, + headers: dict[str, str], + client: httpx.AsyncClient | None, + max_bytes: int, +) -> tuple[bytes, int, httpx.Headers]: parsed = urlparse(url) await _dns_validate_host( parsed.hostname or "", parsed.port or (443 if parsed.scheme == "https" else 80) @@ -1121,7 +1244,16 @@ async def _fetch_adagents_url( raise AdagentsTimeoutError(parsed.netloc, timeout) from e except httpx.RequestError as e: raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e + return body, status_code, response_headers + +def _parse_adagents_response( + url: str, + body: bytes, + status_code: int, + response_headers: httpx.Headers, + cache_entry: AdagentsCacheEntry | None, +) -> tuple[dict[str, Any], str | None, str | None, bool]: if status_code == 304: if cache_entry is None: # The server should not return 304 without a conditional @@ -1208,10 +1340,9 @@ async def _stream_capped( rejected up-front; servers that omit the header (or lie) are still caught by the running total inside the loop. """ - # follow_redirects=False: HTTP 30x is not how adagents.json delegates. - # Cross-host delegation goes through the explicit `authoritative_location` - # field, which passes through _validate_redirect_url. Allowing httpx to - # transparently follow 30x would bypass that SSRF gate. + # Always disable httpx's transparent redirect handling. Callers that opt + # into a redirect policy must inspect Location and re-enter this primitive + # after applying scheme, eTLD+1, DNS, and SSRF gates for the next hop. async with client.stream( "GET", url, headers=headers, timeout=timeout, follow_redirects=False ) as response: diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 09538ed1..ec846100 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -549,6 +549,300 @@ async def test_fetch_success(self): call_args = mock_client.stream.call_args assert "https://example.com/.well-known/adagents.json" in str(call_args) + @pytest.mark.asyncio + async def test_well_known_follows_same_registrable_redirect(self, monkeypatch): + """Initial /.well-known fetch may follow apex -> www style redirects.""" + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + resolved_data = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All properties", + "authorization_type": "property_ids", + "property_ids": ["site1"], + } + ] + } + + caller_urls: list[str] = [] + caller_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 301, + {"location": "https://www.example.com/.well-known/adagents.json"}, + ) + }, + called_urls=caller_urls, + ) + redirect_urls: list[str] = [] + redirect_client = make_url_dispatching_client( + {"https://www.example.com/.well-known/adagents.json": resolved_data}, + called_urls=redirect_urls, + ) + + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + result = await fetch_adagents("example.com", client=caller_client) + + assert result == resolved_data + assert caller_urls == ["https://example.com/.well-known/adagents.json"] + assert redirect_urls == ["https://www.example.com/.well-known/adagents.json"] + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("publisher_domain", "redirect_url"), + [ + ("example.co.uk", "https://www.example.co.uk/.well-known/adagents.json"), + ("victim.github.io", "https://www.victim.github.io/.well-known/adagents.json"), + ], + ) + async def test_well_known_follows_psl_same_registrable_redirect( + self, monkeypatch, publisher_domain, redirect_url + ): + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + resolved_data = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All properties", + "authorization_type": "property_ids", + "property_ids": ["site1"], + } + ] + } + caller_client = make_url_dispatching_client( + { + f"https://{publisher_domain}/.well-known/adagents.json": ( + None, + 301, + {"location": redirect_url}, + ) + } + ) + redirect_client = make_url_dispatching_client({redirect_url: resolved_data}) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + result = await fetch_adagents(publisher_domain, client=caller_client) + + assert result == resolved_data + + @pytest.mark.asyncio + async def test_well_known_rejects_https_downgrade_redirect(self): + from adcp.adagents import fetch_adagents + + mock_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "http://www.example.com/.well-known/adagents.json"}, + ) + } + ) + + with pytest.raises(AdagentsValidationError, match="HTTPS"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_well_known_resolves_relative_redirect_location(self, monkeypatch): + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + resolved_data = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All properties", + "authorization_type": "property_ids", + "property_ids": ["site1"], + } + ] + } + caller_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "/adagents.json"}, + ) + } + ) + redirect_client = make_url_dispatching_client( + {"https://example.com/adagents.json": resolved_data} + ) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + result = await fetch_adagents("example.com", client=caller_client) + + assert result == resolved_data + + @pytest.mark.asyncio + async def test_well_known_rejects_cross_registrable_redirect(self): + from adcp.adagents import fetch_adagents + + mock_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://attacker.example.net/.well-known/adagents.json"}, + ) + } + ) + + with pytest.raises(AdagentsValidationError, match="registrable domain"): + await fetch_adagents("example.com", client=mock_client) + + @pytest.mark.asyncio + async def test_well_known_allows_idn_redirect_after_ace_normalization(self, monkeypatch): + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + resolved_data = { + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "All properties", + "authorization_type": "property_ids", + "property_ids": ["site1"], + } + ] + } + caller_client = make_url_dispatching_client( + { + "https://bücher.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www.xn--bcher-kva.com/.well-known/adagents.json"}, + ) + } + ) + redirect_client = make_url_dispatching_client( + {"https://www.xn--bcher-kva.com/.well-known/adagents.json": resolved_data} + ) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + result = await fetch_adagents("bücher.com", client=caller_client) + + assert result == resolved_data + + @pytest.mark.asyncio + async def test_well_known_rejects_psl_private_cross_tenant_redirect(self): + from adcp.adagents import fetch_adagents + + mock_client = make_url_dispatching_client( + { + "https://victim.github.io/.well-known/adagents.json": ( + None, + 302, + {"location": "https://attacker.github.io/.well-known/adagents.json"}, + ) + } + ) + + with pytest.raises(AdagentsValidationError, match="registrable domain"): + await fetch_adagents("victim.github.io", client=mock_client) + + @pytest.mark.asyncio + async def test_well_known_detects_circular_same_registrable_redirect(self, monkeypatch): + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + caller_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www.example.com/.well-known/adagents.json"}, + ) + } + ) + redirect_client = make_url_dispatching_client( + { + "https://www.example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://example.com/.well-known/adagents.json"}, + ), + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www.example.com/.well-known/adagents.json"}, + ), + } + ) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + with pytest.raises(AdagentsValidationError, match="Circular redirect"): + await fetch_adagents("example.com", client=caller_client) + + @pytest.mark.asyncio + async def test_well_known_redirect_hop_cap(self, monkeypatch): + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + caller_client = make_url_dispatching_client( + { + "https://example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www1.example.com/.well-known/adagents.json"}, + ) + } + ) + redirect_client = make_url_dispatching_client( + { + "https://www1.example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www2.example.com/.well-known/adagents.json"}, + ), + "https://www2.example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www3.example.com/.well-known/adagents.json"}, + ), + "https://www3.example.com/.well-known/adagents.json": ( + None, + 302, + {"location": "https://www4.example.com/.well-known/adagents.json"}, + ), + } + ) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: redirect_client, + ) + + with pytest.raises(AdagentsValidationError, match="Maximum well-known redirect"): + await fetch_adagents("example.com", client=caller_client) + @pytest.mark.asyncio async def test_fetch_follows_authoritative_location(self): """Should follow authoritative_location redirect and return resolved data.""" @@ -599,6 +893,38 @@ async def test_fetch_follows_authoritative_location(self): "https://cdn.example.com/adagents/v2/adagents.json", ] + @pytest.mark.asyncio + async def test_authoritative_location_http_redirect_is_refused(self, monkeypatch): + """authoritative_location names the exact URL; HTTP 30x is not followed.""" + import adcp.adagents as adagents_module + from adcp.adagents import fetch_adagents + + redirect_response_data = { + "$schema": "/schemas/2.6.0/adagents.json", + "authoritative_location": "https://cdn.example.com/adagents.json", + "last_updated": "2025-01-15T10:00:00Z", + } + mock_client = make_url_dispatching_client( + {"https://example.com/.well-known/adagents.json": redirect_response_data} + ) + authoritative_client = make_url_dispatching_client( + { + "https://cdn.example.com/adagents.json": ( + None, + 302, + {"location": "https://cdn.example.com/final.json"}, + ) + } + ) + monkeypatch.setattr( + adagents_module, + "_owned_pinned_client", + lambda url, timeout: authoritative_client, + ) + + with pytest.raises(AdagentsValidationError, match="HTTP 302"): + await fetch_adagents("example.com", client=mock_client) + @pytest.mark.asyncio async def test_fetch_rejects_non_https_authoritative_location(self): """Should reject authoritative_location that uses HTTP instead of HTTPS.""" @@ -3085,7 +3411,7 @@ def handler(url): return self._not_found() if url == "https://publisher.example/ads.txt": return self._text( - "MANAGERDOMAIN=bad-manager.example\n" "MANAGERDOMAIN=good-manager.example\n" + "MANAGERDOMAIN=bad-manager.example\nMANAGERDOMAIN=good-manager.example\n" ) raise AssertionError(f"unexpected url {url}") @@ -4380,7 +4706,7 @@ def handler(request: httpx.Request) -> httpx.Response: assert captured["method"] == "GET" assert captured["url"] == ( - "https://aao.example.com/v1/agents/" "https%3A%2F%2Fagent.example.com%2F/publishers" + "https://aao.example.com/v1/agents/https%3A%2F%2Fagent.example.com%2F/publishers" ) assert isinstance(result, AgentAuthorizationsDirectoryResult) assert result.agent_url == "https://agent.example.com/"