From 60177029fc62603498d9a415119440fd587f3728 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 2 Jun 2026 17:53:39 +0800 Subject: [PATCH] Revert "feature: add more object storage backends (#346)" This reverts commit eae07614ea6f5e578bf5974214afdca703f8b496. --- crates/paimon/Cargo.toml | 16 +- .../src/catalog/rest/rest_token_file_io.rs | 2 +- crates/paimon/src/io/file_io.rs | 156 +------ crates/paimon/src/io/mod.rs | 29 -- crates/paimon/src/io/storage.rs | 239 +++-------- crates/paimon/src/io/storage_azdls.rs | 405 ------------------ crates/paimon/src/io/storage_config.rs | 60 --- crates/paimon/src/io/storage_cos.rs | 136 ------ crates/paimon/src/io/storage_gcs.rs | 201 --------- crates/paimon/src/io/storage_obs.rs | 133 ------ crates/paimon/src/io/storage_s3.rs | 76 +++- docs/src/architecture.md | 2 +- docs/src/getting-started.md | 36 -- docs/src/index.md | 2 +- 14 files changed, 124 insertions(+), 1369 deletions(-) delete mode 100644 crates/paimon/src/io/storage_azdls.rs delete mode 100644 crates/paimon/src/io/storage_config.rs delete mode 100644 crates/paimon/src/io/storage_cos.rs delete mode 100644 crates/paimon/src/io/storage_gcs.rs delete mode 100644 crates/paimon/src/io/storage_obs.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index eeae73a8..184c25ee 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -30,17 +30,7 @@ version.workspace = true [features] default = ["storage-memory", "storage-fs", "storage-oss"] -storage-all = [ - "storage-memory", - "storage-fs", - "storage-oss", - "storage-s3", - "storage-cos", - "storage-azdls", - "storage-obs", - "storage-gcs", - "storage-hdfs", -] +storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3", "storage-hdfs"] fulltext = ["tantivy", "tempfile"] vortex = ["dep:vortex", "dep:kanal"] @@ -48,10 +38,6 @@ storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] -storage-cos = ["opendal/services-cos"] -storage-azdls = ["opendal/services-azdls"] -storage-obs = ["opendal/services-obs"] -storage-gcs = ["opendal/services-gcs"] storage-hdfs = ["opendal/services-hdfs-native"] [dependencies] diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 502a0d99..6233eb10 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -29,6 +29,7 @@ use crate::api::rest_api::RESTApi; use crate::api::rest_util::RESTUtil; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; +use crate::io::storage_oss::OSS_ENDPOINT; use crate::io::FileIO; use crate::Result; @@ -36,7 +37,6 @@ use super::rest_token::RESTToken; /// Safe time margin (in milliseconds) before token expiration to trigger refresh. const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000; -const OSS_ENDPOINT: &str = "fs.oss.endpoint"; /// A FileIO wrapper that supports getting data access tokens from a REST Server. /// diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 2144ed75..7e78004c 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -148,7 +148,7 @@ impl FileIO { statuses.push(FileStatus { size: meta.content_length(), is_dir: meta.is_dir(), - path: status_path(base_path, entry_path), + path: format!("{base_path}{entry_path}"), last_modified: meta .last_modified() .map(|v| DateTime::::from(SystemTime::from(v))), @@ -186,7 +186,7 @@ impl FileIO { statuses.push(FileStatus { size: meta.content_length(), is_dir: false, - path: status_path(base_path, entry_path), + path: format!("{base_path}{entry_path}"), last_modified: meta .last_modified() .map(|v| DateTime::::from(SystemTime::from(v))), @@ -280,14 +280,6 @@ impl FileIO { } } -fn status_path(base_path: &str, entry_path: &str) -> String { - if base_path.ends_with('/') || entry_path.starts_with('/') { - format!("{base_path}{entry_path}") - } else { - format!("{base_path}/{entry_path}") - } -} - fn looks_like_windows_drive_path(path: &str) -> bool { let bytes = path.as_bytes(); bytes.len() >= 3 @@ -716,150 +708,6 @@ mod file_action_test { } } -#[cfg(all( - test, - any( - feature = "storage-cos", - feature = "storage-obs", - feature = "storage-gcs", - feature = "storage-azdls" - ) -))] -mod object_storage_path_test { - use super::*; - - fn assert_relative_paths(file_io: &FileIO, path: &str, expected_relative_path: &str) { - let input = file_io.new_input(path).unwrap(); - assert_eq!(input.location(), path); - assert_eq!( - &input.path[input.relative_path_pos..], - expected_relative_path - ); - - let output = file_io.new_output(path).unwrap(); - assert_eq!(output.location(), path); - assert_eq!( - &output.path[output.relative_path_pos..], - expected_relative_path - ); - - let (_op, relative_path) = file_io.storage.create(path).unwrap(); - assert_eq!(relative_path, expected_relative_path); - - let base_path = &path[..path.len() - relative_path.len()]; - assert_eq!(format!("{base_path}{relative_path}"), path); - } - - #[cfg(feature = "storage-azdls")] - #[test] - fn test_azdls_root_status_path_without_trailing_slash() { - assert_eq!( - status_path( - "abfs://filesystem@account.dfs.core.windows.net", - "warehouse/" - ), - "abfs://filesystem@account.dfs.core.windows.net/warehouse/" - ); - assert_eq!( - status_path( - "abfs://filesystem@account.dfs.core.windows.net/", - "warehouse/" - ), - "abfs://filesystem@account.dfs.core.windows.net/warehouse/" - ); - } - - #[cfg(feature = "storage-cos")] - #[test] - fn test_cos_file_io_relative_paths_and_scheme_aliases() { - for scheme in ["cosn", "cos"] { - let path = format!("{scheme}://bucket/warehouse/table/data.parquet"); - let dir_path = format!("{scheme}://bucket/warehouse/table/"); - let file_io = FileIO::from_path(&path) - .unwrap() - .with_props([ - ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"), - ("fs.cosn.userinfo.secretId", "secret-id"), - ("fs.cosn.userinfo.secretKey", "secret-key"), - ("fs.cosn.disable-config-load", "true"), - ]) - .build() - .unwrap(); - - assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet"); - assert_relative_paths(&file_io, &dir_path, "warehouse/table/"); - } - } - - #[cfg(feature = "storage-obs")] - #[test] - fn test_obs_file_io_relative_paths() { - let file_io = FileIO::from_path("obs://bucket/warehouse") - .unwrap() - .with_props([ - ( - "fs.obs.endpoint", - "https://obs.cn-north-4.myhuaweicloud.com", - ), - ("fs.obs.access.key", "access-key"), - ("fs.obs.secret.key", "secret-key"), - ]) - .build() - .unwrap(); - - assert_relative_paths( - &file_io, - "obs://bucket/warehouse/table/data.parquet", - "warehouse/table/data.parquet", - ); - assert_relative_paths( - &file_io, - "obs://bucket/warehouse/table/", - "warehouse/table/", - ); - } - - #[cfg(feature = "storage-gcs")] - #[test] - fn test_gcs_file_io_relative_paths_and_scheme_aliases() { - for scheme in ["gs", "gcs"] { - let path = format!("{scheme}://bucket/warehouse/table/data.parquet"); - let dir_path = format!("{scheme}://bucket/warehouse/table/"); - let file_io = FileIO::from_path(&path) - .unwrap() - .with_props([ - ("gcs.allow-anonymous", "true"), - ("gcs.disable-config-load", "true"), - ("gcs.disable-vm-metadata", "true"), - ]) - .build() - .unwrap(); - - assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet"); - assert_relative_paths(&file_io, &dir_path, "warehouse/table/"); - } - } - - #[cfg(feature = "storage-azdls")] - #[test] - fn test_azdls_file_io_relative_paths_and_scheme_aliases() { - for scheme in ["abfs", "abfss"] { - let path = format!( - "{scheme}://filesystem@account.dfs.core.windows.net/warehouse/data.parquet" - ); - let dir_path = format!("{scheme}://filesystem@account.dfs.core.windows.net/warehouse/"); - let file_io = FileIO::from_path(&path) - .unwrap() - .with_prop("azure.account-key", "account-key") - .build() - .unwrap(); - - assert_relative_paths(&file_io, &path, "warehouse/data.parquet"); - assert_relative_paths(&file_io, &dir_path, "warehouse/"); - } - } -} - #[cfg(test)] mod input_output_test { use super::*; diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs index 31f7c0dd..7e49c3c8 100644 --- a/crates/paimon/src/io/mod.rs +++ b/crates/paimon/src/io/mod.rs @@ -21,15 +21,6 @@ pub use file_io::*; mod storage; pub use storage::*; -#[cfg(any( - feature = "storage-s3", - feature = "storage-cos", - feature = "storage-azdls", - feature = "storage-obs", - feature = "storage-gcs" -))] -mod storage_config; - #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-fs")] @@ -50,26 +41,6 @@ mod storage_s3; #[cfg(feature = "storage-s3")] use storage_s3::*; -#[cfg(feature = "storage-cos")] -mod storage_cos; -#[cfg(feature = "storage-cos")] -use storage_cos::*; - -#[cfg(feature = "storage-azdls")] -mod storage_azdls; -#[cfg(feature = "storage-azdls")] -use storage_azdls::*; - -#[cfg(feature = "storage-obs")] -mod storage_obs; -#[cfg(feature = "storage-obs")] -use storage_obs::*; - -#[cfg(feature = "storage-gcs")] -mod storage_gcs; -#[cfg(feature = "storage-gcs")] -use storage_gcs::*; - #[cfg(feature = "storage-hdfs")] mod storage_hdfs; #[cfg(feature = "storage-hdfs")] diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs index 59d2740e..a57fcfc2 100644 --- a/crates/paimon/src/io/storage.rs +++ b/crates/paimon/src/io/storage.rs @@ -17,47 +17,22 @@ use std::collections::HashMap; #[cfg(any( - feature = "storage-azdls", - feature = "storage-cos", - feature = "storage-gcs", feature = "storage-oss", - feature = "storage-obs", feature = "storage-s3", feature = "storage-hdfs" ))] use std::sync::Mutex; -#[cfg(any( - feature = "storage-azdls", - feature = "storage-cos", - feature = "storage-gcs", - feature = "storage-oss", - feature = "storage-obs", - feature = "storage-s3" -))] +#[cfg(any(feature = "storage-oss", feature = "storage-s3"))] use std::sync::MutexGuard; -#[cfg(feature = "storage-azdls")] -use super::AzdlsStorageConfig; -#[cfg(feature = "storage-cos")] -use opendal::services::CosConfig; -#[cfg(feature = "storage-gcs")] -use opendal::services::GcsConfig; #[cfg(feature = "storage-hdfs")] use opendal::services::HdfsNativeConfig; -#[cfg(feature = "storage-obs")] -use opendal::services::ObsConfig; #[cfg(feature = "storage-oss")] use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; -#[cfg(any( - feature = "storage-cos", - feature = "storage-gcs", - feature = "storage-oss", - feature = "storage-obs", - feature = "storage-s3" -))] +#[cfg(any(feature = "storage-oss", feature = "storage-s3"))] use url::Url; use crate::error; @@ -81,26 +56,6 @@ pub enum Storage { config: Box, operators: Mutex>, }, - #[cfg(feature = "storage-cos")] - Cos { - config: Box, - operators: Mutex>, - }, - #[cfg(feature = "storage-azdls")] - Azdls { - config: Box, - operators: Mutex>, - }, - #[cfg(feature = "storage-obs")] - Obs { - config: Box, - operators: Mutex>, - }, - #[cfg(feature = "storage-gcs")] - Gcs { - config: Box, - operators: Mutex>, - }, #[cfg(feature = "storage-hdfs")] Hdfs { config: Box, @@ -138,38 +93,6 @@ impl Storage { operators: Mutex::new(HashMap::new()), }) } - #[cfg(feature = "storage-cos")] - Scheme::Cos => { - let config = super::cos_config_parse(props)?; - Ok(Self::Cos { - config: Box::new(config), - operators: Mutex::new(HashMap::new()), - }) - } - #[cfg(feature = "storage-azdls")] - Scheme::Azdls => { - let config = super::azdls_config_parse(props)?; - Ok(Self::Azdls { - config: Box::new(config), - operators: Mutex::new(HashMap::new()), - }) - } - #[cfg(feature = "storage-obs")] - Scheme::Obs => { - let config = super::obs_config_parse(props)?; - Ok(Self::Obs { - config: Box::new(config), - operators: Mutex::new(HashMap::new()), - }) - } - #[cfg(feature = "storage-gcs")] - Scheme::Gcs => { - let config = super::gcs_config_parse(props)?; - Ok(Self::Gcs { - config: Box::new(config), - operators: Mutex::new(HashMap::new()), - }) - } #[cfg(feature = "storage-hdfs")] Scheme::HdfsNative => { let config = super::hdfs_config_parse(props)?; @@ -192,54 +115,16 @@ impl Storage { Storage::LocalFs { op } => Ok((op.clone(), Self::fs_relative_path(path)?)), #[cfg(feature = "storage-oss")] Storage::Oss { config, operators } => { - let (bucket, relative_path) = - Self::bucket_and_relative_path(path, "OSS", &["oss"])?; + let (bucket, relative_path) = Self::oss_bucket_and_relative_path(path)?; let op = Self::cached_oss_operator(config, operators, path, &bucket)?; Ok((op, relative_path)) } #[cfg(feature = "storage-s3")] Storage::S3 { config, operators } => { - let (bucket, relative_path) = - Self::bucket_and_relative_path(path, "S3", &["s3", "s3a"])?; + let (bucket, relative_path) = Self::s3_bucket_and_relative_path(path)?; let op = Self::cached_s3_operator(config, operators, path, &bucket)?; Ok((op, relative_path)) } - #[cfg(feature = "storage-cos")] - Storage::Cos { config, operators } => { - let (bucket, relative_path) = - Self::bucket_and_relative_path(path, "COS", &["cos", "cosn"])?; - let op = Self::cached_operator(operators, "COS", &bucket, || { - super::cos_config_build(config, path) - })?; - Ok((op, relative_path)) - } - #[cfg(feature = "storage-azdls")] - Storage::Azdls { config, operators } => { - let relative_path = super::azdls_relative_path(path)?; - let cache_key = super::azdls_operator_cache_key(config, path)?; - let op = Self::cached_operator(operators, "Azure", &cache_key, || { - super::azdls_config_build(config, path) - })?; - Ok((op, relative_path)) - } - #[cfg(feature = "storage-obs")] - Storage::Obs { config, operators } => { - let (bucket, relative_path) = - Self::bucket_and_relative_path(path, "OBS", &["obs"])?; - let op = Self::cached_operator(operators, "OBS", &bucket, || { - super::obs_config_build(config, path) - })?; - Ok((op, relative_path)) - } - #[cfg(feature = "storage-gcs")] - Storage::Gcs { config, operators } => { - let (bucket, relative_path) = - Self::bucket_and_relative_path(path, "GCS", &["gcs", "gs"])?; - let op = Self::cached_operator(operators, "GCS", &bucket, || { - super::gcs_config_build(config, path) - })?; - Ok((op, relative_path)) - } #[cfg(feature = "storage-hdfs")] Storage::Hdfs { config, op } => { let relative_path = super::hdfs_relative_path(path)?; @@ -281,52 +166,59 @@ impl Storage { } } - #[cfg(any( - feature = "storage-cos", - feature = "storage-gcs", - feature = "storage-obs", - feature = "storage-oss", - feature = "storage-s3" - ))] - fn bucket_and_relative_path<'a>( - path: &'a str, - storage_name: &str, - allowed_schemes: &[&str], - ) -> crate::Result<(String, &'a str)> { + #[cfg(feature = "storage-oss")] + fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String, &str)> { let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid { - message: format!("Invalid {storage_name} url: {path}"), + message: format!("Invalid OSS url: {path}"), })?; let bucket = url .host_str() .ok_or_else(|| error::Error::ConfigInvalid { - message: format!("Invalid {storage_name} url: {path}, missing bucket"), + message: format!("Invalid OSS url: {path}, missing bucket"), })? .to_string(); - let scheme = url.scheme(); - if !allowed_schemes.contains(&scheme) { - return Err(error::Error::ConfigInvalid { - message: format!("Invalid {storage_name} url: {path}, unsupported scheme {scheme}"), - }); - } - let prefix = format!("{scheme}://{bucket}/"); + let prefix = format!("oss://{bucket}/"); let relative_path = path.strip_prefix(&prefix) .ok_or_else(|| error::Error::ConfigInvalid { + message: format!("Invalid OSS url: {path}, should start with {prefix}"), + })?; + Ok((bucket, relative_path)) + } + + #[cfg(feature = "storage-s3")] + fn s3_bucket_and_relative_path(path: &str) -> crate::Result<(String, &str)> { + let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid { + message: format!("Invalid S3 url: {path}"), + })?; + let bucket = url + .host_str() + .ok_or_else(|| error::Error::ConfigInvalid { + message: format!("Invalid S3 url: {path}, missing bucket"), + })? + .to_string(); + let scheme = url.scheme(); + let prefix = match scheme { + "s3" | "s3a" => format!("{scheme}://{bucket}/"), + _ => { + return Err(error::Error::ConfigInvalid { message: format!( - "Invalid {storage_name} url: {path}, should start with {prefix}" + "Invalid S3 url: {path}, should start with s3://{bucket}/ or s3a://{bucket}/" ), + }); + } + }; + let relative_path = + path.strip_prefix(&prefix) + .ok_or_else(|| error::Error::ConfigInvalid { + message: format!( + "Invalid S3 url: {path}, should start with s3://{bucket}/ or s3a://{bucket}/" + ), })?; Ok((bucket, relative_path)) } - #[cfg(any( - feature = "storage-azdls", - feature = "storage-cos", - feature = "storage-gcs", - feature = "storage-oss", - feature = "storage-obs", - feature = "storage-s3" - ))] + #[cfg(any(feature = "storage-oss", feature = "storage-s3"))] fn lock_operator_cache<'a>( operators: &'a Mutex>, storage_name: &str, @@ -337,30 +229,6 @@ impl Storage { }) } - #[cfg(any( - feature = "storage-azdls", - feature = "storage-cos", - feature = "storage-gcs", - feature = "storage-oss", - feature = "storage-obs", - feature = "storage-s3" - ))] - fn cached_operator( - operators: &Mutex>, - storage_name: &str, - cache_key: &str, - build: impl FnOnce() -> crate::Result, - ) -> crate::Result { - let mut operators = Self::lock_operator_cache(operators, storage_name)?; - if let Some(op) = operators.get(cache_key) { - return Ok(op.clone()); - } - - let op = build()?; - operators.insert(cache_key.to_string(), op.clone()); - Ok(op) - } - #[cfg(feature = "storage-oss")] fn cached_oss_operator( config: &OssConfig, @@ -368,9 +236,14 @@ impl Storage { path: &str, bucket: &str, ) -> crate::Result { - Self::cached_operator(operators, "OSS", bucket, || { - super::oss_config_build(config, path) - }) + let mut operators = Self::lock_operator_cache(operators, "OSS")?; + if let Some(op) = operators.get(bucket) { + return Ok(op.clone()); + } + + let op = super::oss_config_build(config, path)?; + operators.insert(bucket.to_string(), op.clone()); + Ok(op) } #[cfg(feature = "storage-s3")] @@ -380,9 +253,14 @@ impl Storage { path: &str, bucket: &str, ) -> crate::Result { - Self::cached_operator(operators, "S3", bucket, || { - super::s3_config_build(config, path) - }) + let mut operators = Self::lock_operator_cache(operators, "S3")?; + if let Some(op) = operators.get(bucket) { + return Ok(op.clone()); + } + + let op = super::s3_config_build(config, path)?; + operators.insert(bucket.to_string(), op.clone()); + Ok(op) } fn parse_scheme(scheme: &str) -> crate::Result { @@ -390,9 +268,6 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), - "cosn" => Ok(Scheme::Cos), - "abfs" | "abfss" | "az" | "azure" => Ok(Scheme::Azdls), - "gs" => Ok(Scheme::Gcs), "hdfs" => Ok(Scheme::HdfsNative), s => Ok(s.parse::()?), } diff --git a/crates/paimon/src/io/storage_azdls.rs b/crates/paimon/src/io/storage_azdls.rs deleted file mode 100644 index 8a71cdd4..00000000 --- a/crates/paimon/src/io/storage_azdls.rs +++ /dev/null @@ -1,405 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use opendal::services::AzdlsConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::error::Error; -use crate::Result; - -use super::storage_config::normalize_storage_config; - -const AZURE_ENDPOINT: &str = "azure.endpoint"; -const AZURE_ACCOUNT_NAME: &str = "azure.account-name"; -const AZURE_ACCOUNT_KEY: &str = "azure.account-key"; -const AZURE_SAS_TOKEN: &str = "azure.sas-token"; - -const CONFIG_PREFIXES: &[&str] = &["fs.azure.", "fs.abfs.", "abfs.", "abfss.", "azure."]; -const MIRRORED_KEYS: &[(&str, &str)] = &[ - ("azure.account-name", "azure.account.name"), - ("azure.account_name", "azure.account.name"), - ("azure.account-key", "azure.account.key"), - ("azure.account_key", "azure.account.key"), - ("azure.sas-token", "azure.sas.token"), - ("azure.sas_token", "azure.sas.token"), - ("azure.client-id", "azure.client.id"), - ("azure.client_id", "azure.client.id"), - ("azure.client-secret", "azure.client.secret"), - ("azure.client_secret", "azure.client.secret"), - ("azure.tenant-id", "azure.tenant.id"), - ("azure.tenant_id", "azure.tenant.id"), - ("azure.authority-host", "azure.authority.host"), - ("azure.authority_host", "azure.authority.host"), -]; - -#[derive(Debug, Clone)] -pub struct AzdlsStorageConfig { - config: AzdlsConfig, - normalized: HashMap, -} - -pub(crate) fn azdls_config_parse(props: HashMap) -> Result { - let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "azure.", MIRRORED_KEYS); - let config = config_from_normalized(&normalized); - - Ok(AzdlsStorageConfig { config, normalized }) -} - -pub(crate) fn azdls_config_build(cfg: &AzdlsStorageConfig, path: &str) -> Result { - let (cfg, relative_path) = azdls_config_for_path(cfg, path)?; - - let builder = cfg.into_builder(); - let op = Operator::new(builder)?.finish(); - - debug_assert_eq!( - relative_path, - azdls_relative_path(path).unwrap_or(relative_path) - ); - Ok(op) -} - -pub(crate) fn azdls_operator_cache_key(cfg: &AzdlsStorageConfig, path: &str) -> Result { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}"), - })?; - let filesystem = if cfg.config.filesystem.is_empty() { - filesystem_from_url(&url, path)? - } else { - cfg.config.filesystem.clone() - }; - let endpoint = effective_endpoint(&cfg.config, &url)?; - - Ok(format!("{}|{}", endpoint.trim_end_matches('/'), filesystem)) -} - -fn azdls_config_for_path<'a>( - storage_cfg: &AzdlsStorageConfig, - path: &'a str, -) -> Result<(AzdlsConfig, &'a str)> { - let (filesystem, relative_path) = azdls_filesystem_and_relative_path(path)?; - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}"), - })?; - - let mut cfg = storage_cfg.config.clone(); - if cfg.filesystem.is_empty() { - cfg.filesystem = filesystem; - } - - let endpoint = effective_endpoint(&cfg, &url)?; - apply_account_scoped_config(&mut cfg, &storage_cfg.normalized, &endpoint); - cfg.endpoint = Some(endpoint); - cfg.root = Some("/".to_string()); - - Ok((cfg, relative_path)) -} - -fn config_from_normalized(normalized: &HashMap) -> AzdlsConfig { - AzdlsConfig { - endpoint: normalized.get(AZURE_ENDPOINT).cloned(), - account_name: normalized.get(AZURE_ACCOUNT_NAME).cloned(), - account_key: normalized.get(AZURE_ACCOUNT_KEY).cloned(), - sas_token: normalized.get(AZURE_SAS_TOKEN).cloned(), - client_id: normalized.get("azure.client-id").cloned(), - client_secret: normalized.get("azure.client-secret").cloned(), - tenant_id: normalized.get("azure.tenant-id").cloned(), - authority_host: normalized.get("azure.authority-host").cloned(), - ..Default::default() - } -} - -fn effective_endpoint(cfg: &AzdlsConfig, url: &Url) -> Result { - cfg.endpoint - .as_ref() - .map(|endpoint| endpoint.trim_end_matches('/').to_string()) - .map(Ok) - .unwrap_or_else(|| default_endpoint(url)) -} - -pub(crate) fn azdls_filesystem_and_relative_path(path: &str) -> Result<(String, &str)> { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}"), - })?; - - let filesystem = filesystem_from_url(&url, path)?; - - Ok((filesystem, azdls_relative_path(path)?)) -} - -pub(crate) fn azdls_relative_path(path: &str) -> Result<&str> { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}"), - })?; - - let path_start = path - .find("://") - .map(|pos| pos + 3) - .ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}"), - })?; - let after_scheme = &path[path_start..]; - let path_start = after_scheme.find('/').map(|pos| path_start + pos + 1); - let url_path = path_start.map(|pos| &path[pos..]).unwrap_or(""); - - if !url.username().is_empty() - || !url - .host_str() - .ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}, missing filesystem"), - })? - .contains('.') - { - Ok(url_path) - } else { - let (_filesystem, relative_path) = url_path.split_once('/').unwrap_or((url_path, "")); - Ok(relative_path) - } -} - -fn filesystem_from_url(url: &Url, path: &str) -> Result { - if !url.username().is_empty() { - return Ok(url.username().to_string()); - } - - let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}, missing filesystem"), - })?; - - if !host.contains('.') { - return Ok(host.to_string()); - } - - url.path() - .strip_prefix('/') - .unwrap_or(url.path()) - .split('/') - .next() - .filter(|v| !v.is_empty()) - .ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {path}, missing filesystem"), - }) - .map(|v| v.to_string()) -} - -fn default_endpoint(url: &Url) -> Result { - if !url.username().is_empty() { - let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {url}, missing account host"), - })?; - return Ok(format!("https://{host}")); - } - - let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid Azure url: {url}, missing account"), - })?; - - if host.contains('.') { - Ok(format!("https://{host}")) - } else { - Err(Error::ConfigInvalid { - message: format!( - "Invalid Azure url: {url}, missing account host; set azure.endpoint for {host}" - ), - }) - } -} - -fn apply_account_scoped_config( - cfg: &mut AzdlsConfig, - normalized: &HashMap, - endpoint: &str, -) { - let Some(host) = endpoint_host(endpoint) else { - return; - }; - let account = host.split('.').next().unwrap_or(host.as_str()); - - if cfg.account_key.is_none() { - cfg.account_key = first_scoped_value( - normalized, - &[ - "azure.account.key", - "azure.account-key", - "azure.account_key", - ], - &[host.as_str(), account], - ); - } - - if cfg.sas_token.is_none() { - cfg.sas_token = first_scoped_value( - normalized, - &[ - "azure.sas.token", - "azure.sas-token", - "azure.sas_token", - "azure.sas.fixed.token", - "azure.fixed.sas.token", - ], - &[host.as_str(), account], - ); - } -} - -fn endpoint_host(endpoint: &str) -> Option { - Url::parse(endpoint) - .ok() - .and_then(|url| url.host_str().map(|host| host.to_string())) -} - -fn first_scoped_value( - normalized: &HashMap, - prefixes: &[&str], - suffixes: &[&str], -) -> Option { - prefixes.iter().find_map(|prefix| { - suffixes - .iter() - .find_map(|suffix| normalized.get(&format!("{prefix}.{suffix}")).cloned()) - }) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn make_props(pairs: &[(&str, &str)]) -> HashMap { - pairs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - #[test] - fn test_azdls_config_parse_keys() { - let props = make_props(&[ - ("fs.azure.account.key", "key"), - ("fs.azure.sas.token", "sas"), - ("azure.endpoint", "https://account.dfs.core.windows.net"), - ]); - - let cfg = azdls_config_parse(props).unwrap(); - assert_eq!( - cfg.config.endpoint.as_deref(), - Some("https://account.dfs.core.windows.net") - ); - assert_eq!(cfg.config.account_key.as_deref(), Some("key")); - assert_eq!(cfg.config.sas_token.as_deref(), Some("sas")); - } - - #[test] - fn test_azdls_config_parse_aliases() { - let props = make_props(&[ - ("azure.account-name", "account"), - ("azure.client-secret", "secret"), - ("azure.tenant-id", "tenant"), - ]); - - let cfg = azdls_config_parse(props).unwrap(); - assert_eq!(cfg.config.account_name.as_deref(), Some("account")); - assert_eq!(cfg.config.client_secret.as_deref(), Some("secret")); - assert_eq!(cfg.config.tenant_id.as_deref(), Some("tenant")); - } - - #[test] - fn test_azdls_config_uses_account_scoped_hadoop_key() { - let cfg = azdls_config_parse(make_props(&[( - "fs.azure.account.key.account.dfs.core.windows.net", - "account-key", - )])) - .unwrap(); - - let (cfg, _) = - azdls_config_for_path(&cfg, "abfs://fs@account.dfs.core.windows.net/path/to/file") - .unwrap(); - - assert_eq!(cfg.account_key.as_deref(), Some("account-key")); - } - - #[test] - fn test_azdls_path_hadoop_authority_form() { - let (filesystem, relative_path) = - azdls_filesystem_and_relative_path("abfs://fs@account.dfs.core.windows.net/a/b") - .unwrap(); - assert_eq!(filesystem, "fs"); - assert_eq!(relative_path, "a/b"); - } - - #[test] - fn test_azdls_path_fsspec_form() { - let (filesystem, relative_path) = - azdls_filesystem_and_relative_path("abfs://fs/a/b").unwrap(); - assert_eq!(filesystem, "fs"); - assert_eq!(relative_path, "a/b"); - } - - #[test] - fn test_azdls_config_build_hadoop_form() { - let cfg = azdls_config_parse(HashMap::new()).unwrap(); - - let op = azdls_config_build(&cfg, "abfs://fs@account.dfs.core.windows.net/a/b").unwrap(); - assert_eq!(op.info().name(), "fs"); - } - - #[test] - fn test_azdls_config_build_fsspec_form_requires_endpoint() { - let cfg = azdls_config_parse(HashMap::new()).unwrap(); - let result = azdls_config_build(&cfg, "abfs://fs/a/b"); - assert!(result.is_err()); - } - - #[test] - fn test_azdls_config_build_fsspec_form_with_endpoint() { - let cfg = azdls_config_parse(make_props(&[( - "azure.endpoint", - "https://account.dfs.core.windows.net", - )])) - .unwrap(); - - let op = azdls_config_build(&cfg, "abfs://fs/a/b").unwrap(); - assert_eq!(op.info().name(), "fs"); - } - - #[test] - fn test_azdls_cache_key_includes_account_host() { - let cfg = azdls_config_parse(HashMap::new()).unwrap(); - - let account_a = azdls_operator_cache_key( - &cfg, - "abfs://fs@account-a.dfs.core.windows.net/path/to/file", - ) - .unwrap(); - let account_b = azdls_operator_cache_key( - &cfg, - "abfs://fs@account-b.dfs.core.windows.net/path/to/file", - ) - .unwrap(); - - assert_ne!(account_a, account_b); - assert_eq!(account_a, "https://account-a.dfs.core.windows.net|fs"); - } - - #[test] - fn test_azdls_config_build_missing_filesystem() { - let cfg = azdls_config_parse(HashMap::new()).unwrap(); - let result = azdls_config_build(&cfg, "abfs:///path/without/filesystem"); - assert!(result.is_err()); - } -} diff --git a/crates/paimon/src/io/storage_config.rs b/crates/paimon/src/io/storage_config.rs deleted file mode 100644 index 90206db0..00000000 --- a/crates/paimon/src/io/storage_config.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -pub(super) fn normalize_storage_config( - props: HashMap, - config_prefixes: &[&str], - canonical_prefix: &str, - mirrored_keys: &[(&str, &str)], -) -> HashMap { - let mut result = HashMap::new(); - - for prefix in config_prefixes { - for (key, value) in &props { - if let Some(suffix) = key.strip_prefix(prefix) { - result.insert(format!("{canonical_prefix}{suffix}"), value.clone()); - } - } - } - - let mirrored_additions: Vec<(String, String)> = mirrored_keys - .iter() - .flat_map(|(a, b)| { - let mut pairs = Vec::new(); - - if !result.contains_key(*b) { - if let Some(v) = result.get(*a) { - pairs.push((b.to_string(), v.clone())); - } - } - if !result.contains_key(*a) { - if let Some(v) = result.get(*b) { - pairs.push((a.to_string(), v.clone())); - } - } - pairs - }) - .collect(); - - for (k, v) in mirrored_additions { - result.insert(k, v); - } - - result -} diff --git a/crates/paimon/src/io/storage_cos.rs b/crates/paimon/src/io/storage_cos.rs deleted file mode 100644 index b8bb3ea4..00000000 --- a/crates/paimon/src/io/storage_cos.rs +++ /dev/null @@ -1,136 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use opendal::services::CosConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::error::Error; -use crate::Result; - -use super::storage_config::normalize_storage_config; - -const COS_ENDPOINT: &str = "fs.cosn.endpoint"; -const COS_SECRET_ID: &str = "fs.cosn.userinfo.secretId"; -const COS_SECRET_KEY: &str = "fs.cosn.userinfo.secretKey"; - -const CONFIG_PREFIXES: &[&str] = &["fs.cosn.", "cosn.", "cos."]; -const MIRRORED_KEYS: &[(&str, &str)] = &[ - ("fs.cosn.endpoint", "fs.cosn.userinfo.endpoint"), - ("fs.cosn.secret_id", "fs.cosn.userinfo.secretId"), - ("fs.cosn.secret-id", "fs.cosn.userinfo.secretId"), - ("fs.cosn.secret_key", "fs.cosn.userinfo.secretKey"), - ("fs.cosn.secret-key", "fs.cosn.userinfo.secretKey"), -]; - -pub(crate) fn cos_config_parse(props: HashMap) -> Result { - let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "fs.cosn.", MIRRORED_KEYS); - - let cfg = CosConfig { - endpoint: normalized.get(COS_ENDPOINT).cloned(), - secret_id: normalized.get(COS_SECRET_ID).cloned(), - secret_key: normalized.get(COS_SECRET_KEY).cloned(), - enable_versioning: normalized - .get("fs.cosn.enable-versioning") - .is_some_and(|v| v.eq_ignore_ascii_case("true")), - disable_config_load: normalized - .get("fs.cosn.disable-config-load") - .is_some_and(|v| v.eq_ignore_ascii_case("true")), - ..Default::default() - }; - - Ok(cfg) -} - -pub(crate) fn cos_config_build(cfg: &CosConfig, path: &str) -> Result { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid COS url: {path}"), - })?; - - let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid COS url: {path}, missing bucket"), - })?; - - let builder = cfg.clone().into_builder().bucket(bucket); - Ok(Operator::new(builder)?.finish()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn make_props(pairs: &[(&str, &str)]) -> HashMap { - pairs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - #[test] - fn test_cos_config_parse_hadoop_keys() { - let props = make_props(&[ - ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"), - ("fs.cosn.userinfo.secretId", "sid"), - ("fs.cosn.userinfo.secretKey", "skey"), - ]); - - let cfg = cos_config_parse(props).unwrap(); - assert_eq!( - cfg.endpoint.as_deref(), - Some("https://cos.ap-shanghai.myqcloud.com") - ); - assert_eq!(cfg.secret_id.as_deref(), Some("sid")); - assert_eq!(cfg.secret_key.as_deref(), Some("skey")); - } - - #[test] - fn test_cos_config_parse_canonical_aliases() { - let props = make_props(&[ - ("cos.endpoint", "https://cos.ap-singapore.myqcloud.com"), - ("cos.secret-id", "sid"), - ("cos.secret-key", "skey"), - ]); - - let cfg = cos_config_parse(props).unwrap(); - assert_eq!( - cfg.endpoint.as_deref(), - Some("https://cos.ap-singapore.myqcloud.com") - ); - assert_eq!(cfg.secret_id.as_deref(), Some("sid")); - assert_eq!(cfg.secret_key.as_deref(), Some("skey")); - } - - #[test] - fn test_cos_config_build_extracts_bucket() { - let cfg = CosConfig { - endpoint: Some("https://cos.ap-shanghai.myqcloud.com".to_string()), - ..Default::default() - }; - - let op = cos_config_build(&cfg, "cosn://my-bucket/some/path").unwrap(); - assert_eq!(op.info().name(), "my-bucket"); - } - - #[test] - fn test_cos_config_build_missing_bucket() { - let cfg = CosConfig::default(); - let result = cos_config_build(&cfg, "cosn:///path/without/bucket"); - assert!(result.is_err()); - } -} diff --git a/crates/paimon/src/io/storage_gcs.rs b/crates/paimon/src/io/storage_gcs.rs deleted file mode 100644 index 575dae3f..00000000 --- a/crates/paimon/src/io/storage_gcs.rs +++ /dev/null @@ -1,201 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use opendal::services::GcsConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::error::Error; -use crate::Result; - -use super::storage_config::normalize_storage_config; - -const GCS_ENDPOINT: &str = "gcs.endpoint"; -const GCS_CREDENTIAL: &str = "gcs.credential"; -const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path"; -const GCS_SERVICE_ACCOUNT: &str = "gcs.service-account"; -const GCS_ALLOW_ANONYMOUS: &str = "gcs.allow-anonymous"; - -const CONFIG_PREFIXES: &[&str] = &["fs.gs.", "fs.gcs.", "gs.", "gcs."]; -const MIRRORED_KEYS: &[(&str, &str)] = &[ - ("gcs.credential-path", "gcs.google_application_credentials"), - ("gcs.credential-path", "gcs.google-application-credentials"), - ("gcs.credential-path", "gcs.application-credentials"), - ("gcs.credential", "gcs.google_service_account_key"), - ("gcs.credential", "gcs.google-service-account-key"), - ("gcs.credential", "gcs.service-account-key"), - ("gcs.credential", "gcs.service_account_key"), - ("gcs.service-account", "gcs.google_service_account"), - ("gcs.service-account", "gcs.google-service-account"), - ("gcs.service-account", "gcs.service_account"), - ("gcs.predefined-acl", "gcs.predefined_acl"), - ("gcs.default-storage-class", "gcs.default_storage_class"), - ("gcs.allow-anonymous", "gcs.google_skip_signature"), - ("gcs.allow-anonymous", "gcs.google-skip-signature"), - ("gcs.allow_anonymous", "gcs.google_skip_signature"), - ("gcs.allow-anonymous", "gcs.allow_anonymous"), - ("gcs.allow-anonymous", "gcs.skip-signature"), - ("gcs.allow-anonymous", "gcs.skip_signature"), - ("gcs.skip-signature", "gcs.google_skip_signature"), - ("gcs.skip_signature", "gcs.google_skip_signature"), - ("gcs.disable-vm-metadata", "gcs.disable_vm_metadata"), - ("gcs.disable-config-load", "gcs.disable_config_load"), -]; - -#[allow(clippy::field_reassign_with_default)] -pub(crate) fn gcs_config_parse(props: HashMap) -> Result { - let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "gcs.", MIRRORED_KEYS); - - let mut cfg = GcsConfig::default(); - cfg.endpoint = normalized.get(GCS_ENDPOINT).cloned(); - cfg.credential = normalized.get(GCS_CREDENTIAL).cloned(); - cfg.credential_path = normalized.get(GCS_CREDENTIAL_PATH).cloned(); - cfg.service_account = normalized.get(GCS_SERVICE_ACCOUNT).cloned(); - cfg.scope = normalized.get("gcs.scope").cloned(); - cfg.predefined_acl = normalized.get("gcs.predefined-acl").cloned(); - cfg.default_storage_class = normalized.get("gcs.default-storage-class").cloned(); - cfg.token = normalized.get("gcs.token").cloned(); - cfg.allow_anonymous = normalized - .get(GCS_ALLOW_ANONYMOUS) - .is_some_and(|v| v.eq_ignore_ascii_case("true")); - cfg.disable_vm_metadata = normalized - .get("gcs.disable-vm-metadata") - .is_some_and(|v| v.eq_ignore_ascii_case("true")); - cfg.disable_config_load = normalized - .get("gcs.disable-config-load") - .is_some_and(|v| v.eq_ignore_ascii_case("true")); - - Ok(cfg) -} - -pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid GCS url: {path}"), - })?; - - let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid GCS url: {path}, missing bucket"), - })?; - - let builder = cfg.clone().into_builder().bucket(bucket); - Ok(Operator::new(builder)?.finish()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn make_props(pairs: &[(&str, &str)]) -> HashMap { - pairs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - #[test] - fn test_gcs_config_parse_keys() { - let props = make_props(&[ - ("fs.gs.endpoint", "https://storage.googleapis.com"), - ("fs.gs.google_application_credentials", "/tmp/gcs.json"), - ("fs.gs.google_service_account_key", "credential-json"), - ( - "fs.gs.google_service_account", - "sa@example.iam.gserviceaccount.com", - ), - ("fs.gs.predefined_acl", "bucketOwnerFullControl"), - ("fs.gs.default_storage_class", "NEARLINE"), - ]); - - let cfg = gcs_config_parse(props).unwrap(); - assert_eq!( - cfg.endpoint.as_deref(), - Some("https://storage.googleapis.com") - ); - assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json")); - assert_eq!(cfg.credential.as_deref(), Some("credential-json")); - assert_eq!( - cfg.service_account.as_deref(), - Some("sa@example.iam.gserviceaccount.com") - ); - assert_eq!( - cfg.predefined_acl.as_deref(), - Some("bucketOwnerFullControl") - ); - assert_eq!(cfg.default_storage_class.as_deref(), Some("NEARLINE")); - } - - #[test] - fn test_gcs_config_parse_canonical_aliases() { - let props = make_props(&[ - ("gcs.credential-path", "/tmp/gcs.json"), - ("gcs.allow-anonymous", "true"), - ]); - - let cfg = gcs_config_parse(props).unwrap(); - assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json")); - assert!(cfg.allow_anonymous); - } - - #[test] - fn test_gcs_config_parse_opendal_aliases() { - let props = make_props(&[ - ( - "gcs.google_application_credentials", - "/tmp/opendal-gcs.json", - ), - ("gcs.google_service_account_key", "credential-json"), - ( - "gcs.google_service_account", - "opendal-sa@example.iam.gserviceaccount.com", - ), - ("gcs.google_skip_signature", "true"), - ("gcs.disable_vm_metadata", "true"), - ("gcs.disable_config_load", "true"), - ]); - - let cfg = gcs_config_parse(props).unwrap(); - assert_eq!( - cfg.credential_path.as_deref(), - Some("/tmp/opendal-gcs.json") - ); - assert_eq!(cfg.credential.as_deref(), Some("credential-json")); - assert_eq!( - cfg.service_account.as_deref(), - Some("opendal-sa@example.iam.gserviceaccount.com") - ); - assert!(cfg.allow_anonymous); - assert!(cfg.disable_vm_metadata); - assert!(cfg.disable_config_load); - } - - #[test] - fn test_gcs_config_build_extracts_bucket() { - let cfg = GcsConfig::default(); - - let op = gcs_config_build(&cfg, "gs://my-bucket/some/path").unwrap(); - assert_eq!(op.info().name(), "my-bucket"); - } - - #[test] - fn test_gcs_config_build_missing_bucket() { - let cfg = GcsConfig::default(); - let result = gcs_config_build(&cfg, "gs:///path/without/bucket"); - assert!(result.is_err()); - } -} diff --git a/crates/paimon/src/io/storage_obs.rs b/crates/paimon/src/io/storage_obs.rs deleted file mode 100644 index 2fde849e..00000000 --- a/crates/paimon/src/io/storage_obs.rs +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use opendal::services::ObsConfig; -use opendal::{Configurator, Operator}; -use url::Url; - -use crate::error::Error; -use crate::Result; - -use super::storage_config::normalize_storage_config; - -const OBS_ENDPOINT: &str = "fs.obs.endpoint"; -const OBS_ACCESS_KEY_ID: &str = "fs.obs.access.key"; -const OBS_SECRET_ACCESS_KEY: &str = "fs.obs.secret.key"; - -const CONFIG_PREFIXES: &[&str] = &["fs.obs.", "obs."]; -const MIRRORED_KEYS: &[(&str, &str)] = &[ - ("fs.obs.access-key-id", "fs.obs.access.key"), - ("fs.obs.access_key_id", "fs.obs.access.key"), - ("fs.obs.secret-access-key", "fs.obs.secret.key"), - ("fs.obs.secret_access_key", "fs.obs.secret.key"), -]; - -#[allow(clippy::field_reassign_with_default)] -pub(crate) fn obs_config_parse(props: HashMap) -> Result { - let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "fs.obs.", MIRRORED_KEYS); - - let mut cfg = ObsConfig::default(); - cfg.endpoint = normalized.get(OBS_ENDPOINT).cloned(); - cfg.access_key_id = normalized.get(OBS_ACCESS_KEY_ID).cloned(); - cfg.secret_access_key = normalized.get(OBS_SECRET_ACCESS_KEY).cloned(); - cfg.enable_versioning = normalized - .get("fs.obs.enable-versioning") - .is_some_and(|v| v.eq_ignore_ascii_case("true")); - - Ok(cfg) -} - -pub(crate) fn obs_config_build(cfg: &ObsConfig, path: &str) -> Result { - let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { - message: format!("Invalid OBS url: {path}"), - })?; - - let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { - message: format!("Invalid OBS url: {path}, missing bucket"), - })?; - - let builder = cfg.clone().into_builder().bucket(bucket); - Ok(Operator::new(builder)?.finish()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn make_props(pairs: &[(&str, &str)]) -> HashMap { - pairs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - #[test] - fn test_obs_config_parse_hadoop_keys() { - let props = make_props(&[ - ( - "fs.obs.endpoint", - "https://obs.cn-north-4.myhuaweicloud.com", - ), - ("fs.obs.access.key", "ak"), - ("fs.obs.secret.key", "sk"), - ]); - - let cfg = obs_config_parse(props).unwrap(); - assert_eq!( - cfg.endpoint.as_deref(), - Some("https://obs.cn-north-4.myhuaweicloud.com") - ); - assert_eq!(cfg.access_key_id.as_deref(), Some("ak")); - assert_eq!(cfg.secret_access_key.as_deref(), Some("sk")); - } - - #[test] - fn test_obs_config_parse_canonical_aliases() { - let props = make_props(&[ - ("obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com"), - ("obs.access-key-id", "ak"), - ("obs.secret-access-key", "sk"), - ]); - - let cfg = obs_config_parse(props).unwrap(); - assert_eq!( - cfg.endpoint.as_deref(), - Some("https://obs.cn-north-4.myhuaweicloud.com") - ); - assert_eq!(cfg.access_key_id.as_deref(), Some("ak")); - assert_eq!(cfg.secret_access_key.as_deref(), Some("sk")); - } - - #[test] - #[allow(clippy::field_reassign_with_default)] - fn test_obs_config_build_extracts_bucket() { - let mut cfg = ObsConfig::default(); - cfg.endpoint = Some("https://obs.cn-north-4.myhuaweicloud.com".to_string()); - - let op = obs_config_build(&cfg, "obs://my-bucket/some/path").unwrap(); - assert_eq!(op.info().name(), "my-bucket"); - } - - #[test] - fn test_obs_config_build_missing_bucket() { - let cfg = ObsConfig::default(); - let result = obs_config_build(&cfg, "obs:///path/without/bucket"); - assert!(result.is_err()); - } -} diff --git a/crates/paimon/src/io/storage_s3.rs b/crates/paimon/src/io/storage_s3.rs index 6fe20b87..57b77c75 100644 --- a/crates/paimon/src/io/storage_s3.rs +++ b/crates/paimon/src/io/storage_s3.rs @@ -24,8 +24,6 @@ use url::Url; use crate::error::Error; use crate::Result; -use super::storage_config::normalize_storage_config; - /// Configuration key for S3 endpoint. /// /// Compatible with paimon-java's `s3.endpoint` / `fs.s3a.endpoint`. @@ -81,18 +79,14 @@ const MIRRORED_KEYS: &[(&str, &str)] = &[ /// By default, virtual-hosted style addressing is enabled (matching AWS /// and Java Paimon behavior). Set `s3.path-style-access=true` to switch /// to path-style for S3-compatible stores like MinIO. -#[allow(clippy::field_reassign_with_default)] pub(crate) fn s3_config_parse(props: HashMap) -> Result { - let normalized = normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.", MIRRORED_KEYS); + let normalized = normalize_config(props); let mut cfg = S3Config::default(); // Default to virtual-hosted style, matching AWS and Java Paimon. // Only disable when path-style-access is explicitly set to true. - let path_style_access = normalized - .get(S3_PATH_STYLE_ACCESS) - .is_some_and(|v| v.eq_ignore_ascii_case("true")); - cfg.enable_virtual_host_style = !path_style_access; + cfg.enable_virtual_host_style = true; // Core connection settings. cfg.endpoint = normalized.get(S3_ENDPOINT).cloned(); @@ -100,6 +94,12 @@ pub(crate) fn s3_config_parse(props: HashMap) -> Result) -> Result Result { Ok(Operator::new(builder)?.finish()) } +/// Normalize Java-compatible config keys to canonical `s3.*` form. +/// +/// 1. Strips known prefixes (`fs.s3a.`, `s3a.`, `s3.`) and remaps to `s3.*`. +/// 2. Applies mirrored key mappings for cross-compatibility. +/// 3. Earlier prefixes in the list take lower priority (later ones overwrite). +fn normalize_config(props: HashMap) -> HashMap { + let mut result = HashMap::new(); + + // First pass: normalize prefixes. Process in priority order — + // `fs.s3a.` (lowest) → `s3a.` → `s3.` (highest, canonical). + for prefix in JAVA_CONFIG_PREFIXES { + for (key, value) in &props { + if let Some(suffix) = key.strip_prefix(prefix) { + let canonical = format!("s3.{suffix}"); + result.insert(canonical, value.clone()); + } + } + } + + // Second pass: apply mirrored keys bidirectionally (only if target not already set). + let mirrored_additions: Vec<(String, String)> = MIRRORED_KEYS + .iter() + .flat_map(|(a, b)| { + let mut pairs = Vec::new(); + // a → b + if !result.contains_key(*b) { + if let Some(v) = result.get(*a) { + pairs.push((b.to_string(), v.clone())); + } + } + // b → a + if !result.contains_key(*a) { + if let Some(v) = result.get(*b) { + pairs.push((a.to_string(), v.clone())); + } + } + pairs + }) + .collect(); + + for (k, v) in mirrored_additions { + result.insert(k, v); + } + + result +} + #[cfg(test)] mod tests { use super::*; @@ -304,7 +353,6 @@ mod tests { } #[test] - #[allow(clippy::field_reassign_with_default)] fn test_s3_config_build_extracts_bucket() { let mut cfg = S3Config::default(); cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string()); @@ -315,7 +363,6 @@ mod tests { } #[test] - #[allow(clippy::field_reassign_with_default)] fn test_s3_config_build_s3a_scheme() { let mut cfg = S3Config::default(); cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string()); @@ -343,8 +390,7 @@ mod tests { fn test_mirrored_keys() { // `s3.access.key` (dot form) should be mirrored from `s3.access-key` (dash form) let props = make_props(&[("s3.access-key", "AKID")]); - let normalized = - normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.", MIRRORED_KEYS); + let normalized = normalize_config(props); assert_eq!( normalized.get("s3.access.key").map(|s| s.as_str()), Some("AKID") diff --git a/docs/src/architecture.md b/docs/src/architecture.md index e47fb75b..f12950e8 100644 --- a/docs/src/architecture.md +++ b/docs/src/architecture.md @@ -33,7 +33,7 @@ The core crate implements the Paimon table format, including: - **Table** — Table abstraction for reading Paimon tables - **Snapshot & Manifest** — Reading snapshot and manifest metadata - **Schema** — Table schema management and evolution -- **File IO** — Abstraction layer for storage backends (local filesystem, object stores, HDFS) +- **File IO** — Abstraction layer for storage backends (local filesystem, S3) - **File Format** — Parquet file reading and writing via Apache Arrow ### `crates/integrations/datafusion` — DataFusion Integration diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md index 90ec311e..0b6d88aa 100644 --- a/docs/src/getting-started.md +++ b/docs/src/getting-started.md @@ -44,11 +44,6 @@ Available storage features: | `storage-memory` | In-memory | | `storage-s3` | Amazon S3 | | `storage-oss` | Alibaba Cloud OSS| -| `storage-cos` | Tencent Cloud COS| -| `storage-azdls` | Azure Data Lake Storage Gen2 | -| `storage-obs` | Huawei Cloud OBS | -| `storage-gcs` | Google Cloud Storage | -| `storage-hdfs` | HDFS | | `storage-all` | All of the above | ## Catalog Management @@ -83,37 +78,6 @@ options.set("fs.oss.accessKeySecret", "your-access-key-secret"); options.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com"); let catalog = CatalogFactory::create(options).await?; -// Tencent Cloud COS -let mut options = Options::new(); -options.set(CatalogOptions::WAREHOUSE, "cosn://bucket/warehouse"); -options.set("fs.cosn.userinfo.secretId", "your-secret-id"); -options.set("fs.cosn.userinfo.secretKey", "your-secret-key"); -options.set("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"); -let catalog = CatalogFactory::create(options).await?; - -// Azure Data Lake Storage Gen2 -let mut options = Options::new(); -options.set(CatalogOptions::WAREHOUSE, "abfs://filesystem@account.dfs.core.windows.net/warehouse"); -options.set("azure.account-key", "your-account-key"); -let catalog = CatalogFactory::create(options).await?; - -// If you use the short form "abfs://filesystem/warehouse", set the endpoint explicitly: -// options.set("azure.endpoint", "https://account.dfs.core.windows.net"); - -// Huawei Cloud OBS -let mut options = Options::new(); -options.set(CatalogOptions::WAREHOUSE, "obs://bucket/warehouse"); -options.set("fs.obs.access.key", "your-access-key-id"); -options.set("fs.obs.secret.key", "your-secret-access-key"); -options.set("fs.obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com"); -let catalog = CatalogFactory::create(options).await?; - -// Google Cloud Storage -let mut options = Options::new(); -options.set(CatalogOptions::WAREHOUSE, "gs://bucket/warehouse"); -options.set("gcs.credential-path", "/path/to/service-account.json"); -let catalog = CatalogFactory::create(options).await?; - // REST catalog let mut options = Options::new(); options.set(CatalogOptions::METASTORE, "rest"); diff --git a/docs/src/index.md b/docs/src/index.md index 252c9dd7..b9c14723 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -28,7 +28,7 @@ Apache Paimon Rust provides native Rust libraries for reading and writing Paimon Key features: - Native Rust reader for Paimon table format -- Support for local filesystem, S3, OSS, COS, Azure, OBS, GCS, and HDFS storage backends +- Support for local filesystem, S3, and OSS storage backends - REST Catalog integration - Apache DataFusion integration for SQL queries