Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 144 additions & 13 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Literal
from urllib.parse import quote, urlparse
from urllib.parse import quote, urljoin, urlparse

import httpx
import idna
from pydantic import Field

from adcp.exceptions import (
Expand All @@ -28,6 +29,7 @@
AdagentsTimeoutError,
AdagentsValidationError,
)
from adcp.signing.etld import same_registrable_domain
from adcp.types.base import AdCPBaseModel
from adcp.validation import ValidationError, validate_adagents

Expand Down Expand Up @@ -373,6 +375,40 @@ def _validate_redirect_url(url: str) -> None:
_check_safe_host(parsed.hostname or "", "authoritative_location")


def _idna_ascii_host(hostname: str, context: str) -> str:
"""Normalize a hostname to IDNA ASCII for eTLD+1 comparisons."""
try:
return idna.encode(hostname.rstrip("."), uts46=True).decode("ascii").lower()
except idna.IDNAError as e:
raise AdagentsValidationError(f"{context} has invalid IDNA hostname: {hostname!r}") from e


def _resolve_well_known_redirect_url(
*,
current_url: str,
location: str | None,
original_hostname: str,
) -> str:
if not location:
raise AdagentsValidationError("adagents.json redirect missing Location header")

next_url = urljoin(current_url, location)
parsed = urlparse(next_url)
if parsed.scheme != "https":
raise AdagentsValidationError(f"adagents.json redirect must be an HTTPS URL: {next_url!r}")
next_hostname = parsed.hostname or ""
_check_safe_host(next_hostname, "adagents.json redirect")

original = _idna_ascii_host(original_hostname, "publisher_domain")
target = _idna_ascii_host(next_hostname, "adagents.json redirect")
if not same_registrable_domain(original, target):
raise AdagentsValidationError(
"adagents.json redirect must stay within the original registrable domain"
)

return next_url


def normalize_url(url: str) -> str:
"""Normalize URL by removing protocol and trailing slash.

Expand Down Expand Up @@ -567,6 +603,11 @@ def verify_agent_authorization(
# Maximum number of authoritative_location redirects to follow
MAX_REDIRECT_DEPTH = 5

# Maximum number of HTTP redirects to follow for the initial
# /.well-known/adagents.json fetch. These redirects are not delegation; they
# only cover same-site hosting normalization such as apex -> www.
MAX_WELL_KNOWN_REDIRECT_HOPS = 3

# Maximum size of a publisher's ads.txt file. IAB practice caps real
# ads.txt files in the low MB range; this gives plenty of headroom while
# preventing a hostile publisher from forcing the SDK to buffer an
Expand Down Expand Up @@ -661,14 +702,25 @@ async def _resolve_direct(
hop_cache = cache_entry if not is_redirect else None

try:
data, etag, last_modified, not_modified = await _fetch_adagents_url(
url,
timeout,
user_agent,
fetch_client,
max_bytes=max_bytes,
cache_entry=hop_cache,
)
if is_redirect:
data, etag, last_modified, not_modified = await _fetch_adagents_url(
url,
timeout,
user_agent,
fetch_client,
max_bytes=max_bytes,
cache_entry=hop_cache,
)
else:
data, etag, last_modified, not_modified = await _fetch_well_known_adagents_url(
url,
timeout,
user_agent,
fetch_client,
original_hostname=publisher_domain,
max_bytes=max_bytes,
cache_entry=hop_cache,
)
except AdagentsNotFoundError:
# A 404 on a followed authoritative_location target is a broken
# redirect chain, not a missing publisher manifest. Surface it as
Expand Down Expand Up @@ -1087,13 +1139,84 @@ async def _fetch_adagents_url(
:data:`MAX_AUTHORITATIVE_BYTES` for dereferenced authoritative files
per adcp#4504).
"""
headers = _adagents_headers(user_agent, cache_entry)
body, status_code, response_headers = await _fetch_adagents_response(
url, timeout, headers, client, max_bytes
)
return _parse_adagents_response(url, body, status_code, response_headers, cache_entry)


async def _fetch_well_known_adagents_url(
url: str,
timeout: float,
user_agent: str,
client: httpx.AsyncClient | None,
*,
original_hostname: str,
max_bytes: int = MAX_POINTER_BYTES,
cache_entry: AdagentsCacheEntry | None = None,
) -> tuple[dict[str, Any], str | None, str | None, bool]:
"""Fetch the initial well-known URL, following safe same-site HTTP redirects."""
headers = _adagents_headers(user_agent, cache_entry)
current_url = url
current_client = client
current_cache = cache_entry
visited_urls: set[str] = set()

for hop in range(MAX_WELL_KNOWN_REDIRECT_HOPS + 1):
if current_url in visited_urls:
raise AdagentsValidationError("Circular redirect detected in adagents.json fetch")
visited_urls.add(current_url)

body, status_code, response_headers = await _fetch_adagents_response(
current_url,
timeout,
headers,
current_client,
max_bytes,
)
if status_code not in {301, 302, 303, 307, 308}:
return _parse_adagents_response(
current_url, body, status_code, response_headers, current_cache
)

if hop >= MAX_WELL_KNOWN_REDIRECT_HOPS:
raise AdagentsValidationError(
f"Maximum well-known redirect hops ({MAX_WELL_KNOWN_REDIRECT_HOPS}) exceeded"
)

current_url = _resolve_well_known_redirect_url(
current_url=current_url,
location=response_headers.get("location"),
original_hostname=original_hostname,
)
# Only the publisher's first URL may use the caller's client and
# conditional validators. Every HTTP redirect hop gets a fresh
# SDK-owned pinned client inside _fetch_adagents_response.
current_client = None
current_cache = None
headers = _adagents_headers(user_agent, None)

raise AssertionError("Unreachable") # pragma: no cover


def _adagents_headers(user_agent: str, cache_entry: AdagentsCacheEntry | None) -> dict[str, str]:
headers: dict[str, str] = {"User-Agent": user_agent}
if cache_entry is not None:
if cache_entry.etag:
headers["If-None-Match"] = cache_entry.etag
if cache_entry.last_modified:
headers["If-Modified-Since"] = cache_entry.last_modified
return headers


async def _fetch_adagents_response(
url: str,
timeout: float,
headers: dict[str, str],
client: httpx.AsyncClient | None,
max_bytes: int,
) -> tuple[bytes, int, httpx.Headers]:
parsed = urlparse(url)
await _dns_validate_host(
parsed.hostname or "", parsed.port or (443 if parsed.scheme == "https" else 80)
Expand Down Expand Up @@ -1121,7 +1244,16 @@ async def _fetch_adagents_url(
raise AdagentsTimeoutError(parsed.netloc, timeout) from e
except httpx.RequestError as e:
raise AdagentsValidationError(f"Failed to fetch adagents.json: {e}") from e
return body, status_code, response_headers


def _parse_adagents_response(
url: str,
body: bytes,
status_code: int,
response_headers: httpx.Headers,
cache_entry: AdagentsCacheEntry | None,
) -> tuple[dict[str, Any], str | None, str | None, bool]:
if status_code == 304:
if cache_entry is None:
# The server should not return 304 without a conditional
Expand Down Expand Up @@ -1208,10 +1340,9 @@ async def _stream_capped(
rejected up-front; servers that omit the header (or lie) are still
caught by the running total inside the loop.
"""
# follow_redirects=False: HTTP 30x is not how adagents.json delegates.
# Cross-host delegation goes through the explicit `authoritative_location`
# field, which passes through _validate_redirect_url. Allowing httpx to
# transparently follow 30x would bypass that SSRF gate.
# Always disable httpx's transparent redirect handling. Callers that opt
# into a redirect policy must inspect Location and re-enter this primitive
# after applying scheme, eTLD+1, DNS, and SSRF gates for the next hop.
async with client.stream(
"GET", url, headers=headers, timeout=timeout, follow_redirects=False
) as response:
Expand Down
Loading
Loading