From b8717f1f2bade62afecbb626017fc9f0ca47f55f Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 26 May 2026 12:40:50 +0900 Subject: [PATCH] Add create_sql_view method to Catalog --- pyiceberg/catalog/__init__.py | 46 +++++++++++++++++++- tests/integration/test_catalog.py | 17 ++++++++ tests/integration/test_rest_catalog.py | 18 +------- tests/integration/test_writes/test_writes.py | 27 ++++++++++++ 4 files changed, 90 insertions(+), 18 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 95ceaa539f..75f1da6afd 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -20,6 +20,7 @@ import importlib import logging import re +import time import uuid from abc import ABC, abstractmethod from collections.abc import Callable @@ -71,7 +72,7 @@ from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool from pyiceberg.view import View -from pyiceberg.view.metadata import ViewVersion +from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -744,6 +745,49 @@ def create_view( ViewAlreadyExistsError: If a view with the name already exists. """ + def create_sql_view( + self, + identifier: str | Identifier, + schema: Schema | pa.Schema, + dialect: str, + sql: str, + default_namespace: str | Identifier, + location: str | None = None, + properties: Properties = EMPTY_DICT, + default_catalog: str | None = None, + ) -> View: + """Create a view. + + Args: + identifier (str | Identifier): View identifier. + schema (Schema): View's schema. + dialect (str): SQL dialect for the view. + sql (str): SQL for the view. + default_namespace (str | Identifier): Default namespace name. + location (str | None): Location for the view. Optional Argument. + properties (Properties): View properties that can be a string based dictionary. + default_catalog (str | None): Default catalog name. Optional Argument. + + Returns: + View: the created view instance. + + Raises: + ViewAlreadyExistsError: If a view with the name already exists. + """ + iceberg_schema = self._convert_schema_if_needed(schema) + namespace_tuple = Catalog.identifier_to_tuple(default_namespace) + + view_version = ViewVersion( + version_id=1, + schema_id=iceberg_schema.schema_id, + timestamp_ms=int(time.time() * 1000), + summary={}, # TODO Set summary field like EnvironmentContext of Iceberg Java + representations=[SQLViewRepresentation(type="sql", dialect=dialect, sql=sql)], + default_catalog=default_catalog, + default_namespace=namespace_tuple, + ) + return self.create_view(identifier, iceberg_schema, view_version, location, properties) + @staticmethod def identifier_to_tuple(identifier: str | Identifier) -> Identifier: """Parse an identifier to a tuple. diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 4188ad83db..1fd5325ef4 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -653,6 +653,23 @@ def test_rest_create_view( assert rest_catalog.load_view(identifier).schema() == view.schema() +@pytest.mark.integration +def test_rest_create_sql_view( + rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str +) -> None: + identifier = (database_name, view_name) + + rest_catalog.create_namespace_if_not_exists(database_name) + view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1)) + + assert not rest_catalog.view_exists(identifier) + + rest_catalog.create_sql_view(identifier, view.schema(), "spark", "SELECT * FROM prod.db.table", "default") + + assert rest_catalog.view_exists(identifier) + assert rest_catalog.load_view(identifier).schema() == view.schema() + + @pytest.mark.integration def test_rest_drop_view( rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str diff --git a/tests/integration/test_rest_catalog.py b/tests/integration/test_rest_catalog.py index 05039a982e..753a2fb367 100644 --- a/tests/integration/test_rest_catalog.py +++ b/tests/integration/test_rest_catalog.py @@ -16,7 +16,6 @@ # under the License. # pylint:disable=redefined-outer-name -import time import pytest from pytest_lazy_fixtures import lf @@ -24,7 +23,6 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchViewError from pyiceberg.schema import Schema -from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion TEST_NAMESPACE_IDENTIFIER = "TEST NS" @@ -76,21 +74,7 @@ def test_load_view(catalog: RestCatalog, table_schema_nested: Schema, database_n if not catalog.namespace_exists(database_name): catalog.create_namespace(database_name) - view_version = ViewVersion( - version_id=1, - schema_id=1, - timestamp_ms=int(time.time() * 1000), - summary={}, - representations=[ - SQLViewRepresentation( - type="sql", - sql="SELECT 1 as some_col", - dialect="spark", - ) - ], - default_namespace=["default"], - ) - view = catalog.create_view(identifier, table_schema_nested, view_version=view_version) + view = catalog.create_sql_view(identifier, table_schema_nested, "spark", "SELECT 1 as some_col", "default") loaded_view = catalog.load_view(identifier) assert view == loaded_view diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 609c1863bc..cbbb34bffe 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1799,6 +1799,33 @@ def test_create_view( session_catalog.drop_view(identifier) # clean up +@pytest.mark.integration +def test_create_sql_view( + spark: SparkSession, + session_catalog: Catalog, +) -> None: + # Create a view using the REST Catalog. + identifier = "default.some_view" + schema = pa.schema([pa.field("some_col", pa.int32())]) + session_catalog.create_sql_view( + identifier=identifier, + schema=schema, + dialect="spark", + sql="SELECT 1 as some_col", + default_namespace="default", + ) + + # Ensure the view exists. + assert session_catalog.view_exists(identifier) + + # Query the view in spark to ensure it was properly created. + df = spark.table(identifier) + assert df.count() == 1 + assert df.collect()[0].some_col == 1 + + session_catalog.drop_view(identifier) # clean up + + @pytest.mark.integration def test_view_exists( spark: SparkSession,