From 9d85042b0fec350ebefd5cb7f626d315a95bf522 Mon Sep 17 00:00:00 2001 From: rahulsmahadev Date: Tue, 26 May 2026 23:20:28 +0000 Subject: [PATCH] REST: Add retry and timeout configuration for REST catalog Closes #2772. The REST Catalog uses requests with no retries and no timeout by default, so transient 5xx/network failures bubble up immediately and slow servers can hang the client indefinitely (e.g. a Polaris instance returning 504 from a proxy). Add an optional connection: block on the catalog properties: catalog: default: uri: http://rest-catalog/ws/ connection: timeout: 60 retry: total: 5 backoff_factor: 1.0 status_forcelist: [429, 500, 502, 503, 504] allowed_methods: [GET, HEAD, OPTIONS] connection.retry is passed verbatim to urllib3.util.retry.Retry. Both keys are optional and opt-in: when neither is set the default requests behavior is preserved. Signed-off-by: rahulsmahadev --- mkdocs/docs/configuration.md | 25 +++++++++++ pyiceberg/catalog/rest/__init__.py | 72 ++++++++++++++++++++++++++++-- tests/catalog/test_rest.py | 69 ++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 3 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3f82f78895..fbb8265f54 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -348,6 +348,31 @@ catalog: | snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. | | `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms. When using `vended-credentials`, the server provides temporary credentials to the client. When using `remote-signing`, the server signs requests on behalf of the client. (default: `vended-credentials`) | +#### Retry and timeout + +The REST Catalog uses `requests` with no retries and no timeout by default, so transient +5xx/network failures bubble up immediately and slow servers can hang the client indefinitely. +Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy. +Both keys are optional; when neither is set, the default `requests` behavior is preserved. + +```yaml +catalog: + default: + uri: http://rest-catalog/ws/ + connection: + timeout: 60 # seconds, applied to every HTTP call + retry: + total: 5 + backoff_factor: 1.0 + status_forcelist: [429, 500, 502, 503, 504] + allowed_methods: [GET, HEAD, OPTIONS] +``` + +| Key | Example | Description | +| ---------------------------- | ------------------------------------ | ------------------------------------------------------------------------------------------------------ | +| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. | +| connection.retry | `{total: 5, backoff_factor: 1.0}` | Mapping passed verbatim as kwargs to [`urllib3.util.retry.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry). | + #### Headers in REST Catalog To configure custom headers in REST Catalog, include them in the catalog properties with `header.`. This diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 39954ef561..e0457e17e7 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -25,9 +25,11 @@ from urllib.parse import quote, unquote from pydantic import ConfigDict, Field, TypeAdapter, field_validator -from requests import HTTPError, Session +from requests import HTTPError, PreparedRequest, Response, Session +from requests.adapters import HTTPAdapter from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt from typing_extensions import override +from urllib3.util.retry import Retry from pyiceberg import __version__ from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary @@ -255,6 +257,9 @@ class ScanPlanningMode(Enum): SIGV4_SERVICE = "rest.signing-name" SIGV4_MAX_RETRIES = "rest.sigv4.max-retries" SIGV4_MAX_RETRIES_DEFAULT = 10 +CONNECTION = "connection" +CONNECTION_TIMEOUT = "timeout" +CONNECTION_RETRY = "retry" EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" OAUTH2_SERVER_URI = "oauth2-server-uri" SNAPSHOT_LOADING_MODE = "snapshot-loading-mode" @@ -392,6 +397,63 @@ class ListViewsResponse(IcebergBaseModel): _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) +class _RetryTimeoutHTTPAdapter(HTTPAdapter): + """HTTPAdapter that applies a default per-request timeout. + + requests does not provide a way to set a default timeout on a Session; + without this adapter, every call would have to thread `timeout=` through. + The adapter applies `self._timeout` whenever a per-call timeout is not set. + """ + + def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None: + self._timeout = timeout + if max_retries is not None: + super().__init__(max_retries=max_retries) + else: + super().__init__() + + def send(self, request: PreparedRequest, **kwargs: Any) -> Response: + if kwargs.get("timeout") is None and self._timeout is not None: + kwargs["timeout"] = self._timeout + return super().send(request, **kwargs) + + +def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None: + """Build a connection adapter from the optional `connection.*` properties. + + Returns None when no `connection` block is supplied, leaving the default + Session behavior unchanged. Raises ValueError on invalid input. + """ + connection_config = properties.get(CONNECTION) + if not connection_config: + return None + if not isinstance(connection_config, dict): + raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}") + + timeout: float | None = None + if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None: + try: + timeout = float(raw_timeout) + except (TypeError, ValueError) as e: + raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e + if timeout <= 0: + raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}") + + retry: Retry | None = None + if (retry_config := connection_config.get(CONNECTION_RETRY)) is not None: + if not isinstance(retry_config, dict): + raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRY}` must be a mapping, got: {type(retry_config).__name__}") + try: + retry = Retry(**retry_config) + except TypeError as e: + raise ValueError(f"Invalid `{CONNECTION}.{CONNECTION_RETRY}` configuration: {e}") from e + + if timeout is None and retry is None: + return None + + return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=retry) + + class RestCatalog(Catalog): uri: str _session: Session @@ -418,6 +480,12 @@ def _create_session(self) -> Session: """Create a request session with provided catalog configuration.""" session = Session() + # Mount the retry/timeout adapter when `connection.*` properties are set. + # SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host. + if (connection_adapter := _create_connection_adapter(self.properties)) is not None: + session.mount("http://", connection_adapter) + session.mount("https://", connection_adapter) + # Set HTTP headers self._config_headers(session) @@ -763,8 +831,6 @@ def _init_sigv4(self, session: Session) -> None: import boto3 from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest - from requests import PreparedRequest - from requests.adapters import HTTPAdapter class SigV4Adapter(HTTPAdapter): def __init__(self, **properties: str): diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index df2f96a392..978ce40777 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -32,6 +32,9 @@ import pyiceberg from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog from pyiceberg.catalog.rest import ( + CONNECTION, + CONNECTION_RETRY, + CONNECTION_TIMEOUT, DEFAULT_ENDPOINTS, EMPTY_BODY_SHA256, OAUTH2_SERVER_URI, @@ -43,6 +46,7 @@ HttpMethod, RestCatalog, ScanPlanningMode, + _RetryTimeoutHTTPAdapter, ) from pyiceberg.exceptions import ( AuthorizationExpiredError, @@ -2019,6 +2023,71 @@ def test_request_session_with_ssl_client_cert() -> None: assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value) +def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None: + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + for adapter in catalog._session.adapters.values(): + assert not isinstance(adapter, _RetryTimeoutHTTPAdapter) + + +def test_session_with_connection_timeout_and_retry(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: { + CONNECTION_TIMEOUT: 60, + CONNECTION_RETRY: { + "total": 5, + "backoff_factor": 1.0, + "status_forcelist": [429, 500, 502, 503, 504], + "allowed_methods": ["GET", "HEAD", "OPTIONS"], + }, + }, + } + catalog = RestCatalog("rest", **catalog_properties) # type: ignore + + https_adapter = catalog._session.adapters["https://"] + http_adapter = catalog._session.adapters["http://"] + assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter) + assert https_adapter is http_adapter + assert https_adapter._timeout == 60.0 + assert https_adapter.max_retries.total == 5 + assert https_adapter.max_retries.backoff_factor == 1.0 + assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504] + assert set(https_adapter.max_retries.allowed_methods) == {"GET", "HEAD", "OPTIONS"} + + +def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_TIMEOUT: "30"}, + } + catalog = RestCatalog("rest", **catalog_properties) # type: ignore + adapter = catalog._session.adapters["https://"] + assert isinstance(adapter, _RetryTimeoutHTTPAdapter) + assert adapter._timeout == 30.0 + + +def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_TIMEOUT: -1}, + } + with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"): + RestCatalog("rest", **catalog_properties) # type: ignore + + +def test_session_with_invalid_connection_retry_kwarg_raises(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_RETRY: {"bogus_kwarg": 1}}, + } + with pytest.raises(ValueError, match="Invalid `connection.retry` configuration"): + RestCatalog("rest", **catalog_properties) # type: ignore + + def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None: # Given rest_mock.get(