diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md index d513e4b80e41..31871a9008a0 100644 --- a/antalya/docs/design/alter-table-export-part-partition.md +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -422,12 +422,11 @@ The following notes expand on expected behavior of commands. every active part of partition `p` across all replicas that host it; `system.replicated_partition_exports` converges to `COMPLETED`. -4. Re-issuing the same `EXPORT PARTITION` within - `export_merge_tree_partition_manifest_ttl` is a no-op (no - duplicate files) unless `export_merge_tree_partition_force_export = 1`. This - behavior avoids accidentally exporting the same data twice. Note, however - that forcing the operation is dangerous if ClickHouse can't clean up the - previous operation. In this case you'll potentially commit files twice. +4. `system.replicated_partition_exports` is an append-only history. + Re-issuing `EXPORT PARTITION` for a key that has any entry (in any + terminal state) is rejected unless `export_merge_tree_partition_force_export = 1`. + Forcing the operation is dangerous if ClickHouse can't clean up the + previous operation — you'll potentially commit files twice. 5. Killing an in-flight partition export via `KILL EXPORT PARTITION` transitions status to `KILLED` and stops all replicas' contributions. @@ -481,7 +480,6 @@ The following notes expand on expected behavior of commands. | `export_merge_tree_part_filename_pattern` | query | `{part_name}_{checksum}` | `String` | both | Filename template; supports `{part_name}`, `{checksum}`, `{database}`, `{table}`, server macros. | | `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. Dangerous — can produce duplicate data on the destination; use with caution. | | `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Retry budget applied to both per-part export attempts and per-task commit attempts (Iceberg). The task fails terminally if commit retries alone exceed the budget. | -| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. Keep this greater than `export_merge_tree_partition_task_timeout_seconds` if you want the `KILLED` entry to remain visible in `system.replicated_partition_exports` after the timeout fires. | | `export_merge_tree_partition_task_timeout_seconds` | query | `3600` (seconds) | `UInt64` (`0`=disable) | `EXPORT PARTITION` | Wall-clock cap for `PENDING` tasks; on expiry transitions to `KILLED` with a timeout reason. Measured from manifest `create_time`. Enforcement latency ≈ one manifest-updater poll cycle (~30s) plus Keeper watch propagation. | | `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. (See NOTE 2.)| | `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. | diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 975915859482..7f6737014da9 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -59,7 +59,7 @@ TO TABLE [destination_database.]destination_table - **Type**: `Bool` - **Default**: `false` -- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution. +- **Description**: Overwrite an existing entry in `system.replicated_partition_exports` for the same `(source, destination, partition_id)`. This is required because the system table is an append-only history; the only way to re-export the same partition to the same destination is to set this flag. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution. #### `export_merge_tree_partition_max_retries` (Optional) @@ -67,12 +67,6 @@ TO TABLE [destination_database.]destination_table - **Default**: `3` - **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails. -#### `export_merge_tree_partition_manifest_ttl` (Optional) - -- **Type**: `UInt64` -- **Default**: `180` (seconds) -- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones. - #### `export_merge_tree_part_file_already_exists_policy` (Optional) - **Type**: `MergeTreePartExportFileAlreadyExistsPolicy` @@ -109,7 +103,7 @@ When the timeout is exceeded the task transitions to KILLED (same terminal state Notes: - Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation. -- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires. +- `system.replicated_partition_exports` is an append-only history: terminal entries (`COMPLETED` / `FAILED` / `KILLED`) are never automatically removed. ## Examples @@ -205,6 +199,68 @@ Status values include: - `FAILED` - Export failed - `KILLED` - Export was cancelled +## TTL EXPORT + +Replicated*MergeTree tables can drive `EXPORT PARTITION` automatically through +the `TTL EXPORT` clause. Once a partition's *top boundary* plus the configured +interval lies in the past, a background task on the table schedules an export +to the destination just as if the operator had run `ALTER TABLE ... EXPORT +PARTITION`. + +### Syntax + +```sql +CREATE TABLE rmt_table (id UInt64, d Date) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'r1') +PARTITION BY toYearNumSinceEpoch(d) +ORDER BY tuple() +TTL EXPORT INTERVAL 30 DAY TO TABLE iceberg_table; +``` + +Multiple `TTL EXPORT` clauses are allowed on the same table; each rule +maintains its own progress against its destination. + +### Eligibility + +A partition is eligible when + +``` +top_boundary(partition_id) + INTERVAL <= now() +``` + +`top_boundary` is the *inclusive supremum* of the time range that the +partition can hold. For example, a partition keyed by `toYear(d) = 2020` +has a top boundary of `2020-12-31 23:59:59`. + +### Supported `PARTITION BY` expressions + +`TTL EXPORT` requires a closed-form inverse for the partition function. The +curated whitelist is: + +| Family | Expressions | +|-----------------------|----------------------------------------------------------------------| +| Identity | `Date`, `Date32`, `DateTime`, `DateTime64` | +| Generic `to*` | `toYear`, `toYYYYMM`, `toYYYYMMDD`, `toMonday`, `toStartOf{Year,Quarter,Month,Week,Day,Hour,Minute}` | +| Iceberg-spec transforms | `toYearNumSinceEpoch`, `toMonthNumSinceEpoch`, `toRelativeDayNum`, `toRelativeHourNum` | + +**Pairing with Iceberg destinations:** Apache Iceberg only accepts the four +*Iceberg-spec transforms* above as time-bucketing partition functions +(mapping to the Iceberg `year`, `month`, `days`, `hours` transforms). When +exporting into an Iceberg table both the source `PARTITION BY` and the +destination `PARTITION BY` must use the same one of those functions. The +generic `to*` family is appropriate for non-Iceberg destinations +(e.g. Hive-partitioned object storage). + +`icebergTruncate` / `icebergBucket` are not temporal and are not accepted by +`TTL EXPORT`; bare integer columns are also rejected because they carry no +time semantics. + +### Origin column + +Entries scheduled by the background task carry `origin = 'TTL'` in +`system.replicated_partition_exports`; entries from manual `ALTER ... EXPORT +PARTITION` carry `origin = 'ALTER'`. + ## Related Features - [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 79eb5a888bf0..4c539426ed04 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -667,6 +667,7 @@ M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \ M(1006, EXPORT_PARTITION_ALREADY_EXPORTED) \ M(1007, PARTITION_EXPORT_FAILED) \ + M(1008, EXPORT_PARTITION_BACKFILL_NOT_ALLOWED) \ /* See END */ #ifdef APPLY_FOR_EXTERNAL_ERROR_CODES @@ -683,7 +684,7 @@ namespace ErrorCodes APPLY_FOR_ERROR_CODES(M) #undef M - constexpr ErrorCode END = 1007; + constexpr ErrorCode END = 1008; ErrorPairHolder values[END + 1]{}; struct ErrorCodesNames diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 847ef382dcdf..901381571232 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7534,10 +7534,6 @@ Ignore existing partition export and overwrite the zookeeper entry )", 0) \ DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"( Maximum number of retries for exporting a merge tree part in an export partition task -)", 0) \ - DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 86400, R"( -Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. -This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones. )", 0) \ DECLARE(UInt64, export_merge_tree_partition_task_timeout_seconds, 3600, R"( Maximum wall-clock duration (in seconds) an export partition task is allowed to remain in the PENDING state before it is auto-killed by the background cleanup loop. @@ -7546,7 +7542,6 @@ When the timeout is exceeded the task transitions to KILLED (same terminal state Notes: - Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation. -- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires. )", 0) \ DECLARE(MergeTreePartExportFileAlreadyExistsPolicy, export_merge_tree_part_file_already_exists_policy, MergeTreePartExportFileAlreadyExistsPolicy::skip, R"( Possible values: diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3f354da6f43f..9252d58c88d5 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -123,7 +123,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_remote_initiator_cluster", "", "", "New setting."}, // {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, {"export_merge_tree_partition_task_timeout_seconds", 0, 3600, "New setting to control the timeout for export partition tasks."}, - {"export_merge_tree_partition_manifest_ttl", 180, 86400, "Reasonable default for real usage"}, + /// `export_merge_tree_partition_manifest_ttl` was removed: `system.replicated_partition_exports` is + /// now an append-only history table (entries never expire). See TTL EXPORT support. }); addSettingsChanges(settings_changes_history, "26.1", { @@ -324,7 +325,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, {"export_merge_tree_partition_force_export", false, false, "New setting."}, {"export_merge_tree_partition_max_retries", 3, 3, "New setting."}, - {"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."}, + /// `export_merge_tree_partition_manifest_ttl` removed in later versions (entries never expire). {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index c6c8d1eb8a08..4811c7e992bc 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -32,6 +32,21 @@ ASTPtr ASTTTLElement::clone() const void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { + if (mode == TTLMode::EXPORT) + { + ostr << "EXPORT "; + auto ttl_expr = ttl(); + auto nested_frame = frame; + if (auto * ast_alias = dynamic_cast(ttl_expr.get()); ast_alias && !ast_alias->tryGetAlias().empty()) + nested_frame.need_parens = true; + ttl_expr->format(ostr, settings, state, nested_frame); + ostr << " TO TABLE "; + if (!destination_database.empty()) + ostr << backQuoteIfNeed(destination_database) << "."; + ostr << backQuoteIfNeed(destination_name); + return; + } + auto ttl_expr = ttl(); auto nested_frame = frame; if (auto * ast_alias = dynamic_cast(ttl_expr.get()); ast_alias && !ast_alias->tryGetAlias().empty()) diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 8cef82ed293b..c64e71b3b799 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -15,6 +15,7 @@ class ASTTTLElement : public IAST public: TTLMode mode; DataDestinationType destination_type; + String destination_database; String destination_name; bool if_exists = false; diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 29ea00e6d04f..94c8858184cb 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -349,6 +349,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT, "EXPORT") \ MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ MR_MACROS(MOVE, "MOVE") \ diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index aebd8d4b1094..68afcdc882fe 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include @@ -2442,6 +2443,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserKeyword s_to_disk(Keyword::TO_DISK); ParserKeyword s_to_volume(Keyword::TO_VOLUME); + ParserKeyword s_to_table(Keyword::TO_TABLE); ParserKeyword s_if_exists(Keyword::IF_EXISTS); ParserKeyword s_delete(Keyword::DELETE); ParserKeyword s_where(Keyword::WHERE); @@ -2449,6 +2451,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_set(Keyword::SET); ParserKeyword s_recompress(Keyword::RECOMPRESS); ParserKeyword s_codec(Keyword::CODEC); + ParserKeyword s_export(Keyword::EXPORT); ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL); ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL); ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL); @@ -2470,6 +2473,30 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || s_modify_ttl.checkWithoutMoving(pos, expected)) return false; + /// EXPORT branch: `EXPORT TO TABLE [db.]name`. + /// Has no preceding column-based expression; the interval is the only argument. + if (s_export.ignore(pos, expected)) + { + ASTPtr interval_expr; + if (!parser_exp.parse(pos, interval_expr, expected)) + return false; + + if (!s_to_table.ignore(pos, expected)) + return false; + + String dest_database; + String dest_table; + if (!parseDatabaseAndTableName(pos, expected, dest_database, dest_table)) + return false; + + auto ttl_element = make_intrusive(TTLMode::EXPORT, DataDestinationType::TABLE, /*destination_name=*/ String{}, /*if_exists=*/ false); + ttl_element->destination_database = std::move(dest_database); + ttl_element->destination_name = std::move(dest_table); + ttl_element->setTTL(std::move(interval_expr)); + node = ttl_element; + return true; + } + ASTPtr ttl_expr; if (!parser_exp.parse(pos, ttl_expr, expected)) return false; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index acfabc28ca61..66c871eb6c2b 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -152,6 +152,15 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry } }; +/// Provenance of an export partition manifest. +/// - ALTER: scheduled by `ALTER TABLE ... EXPORT PARTITION ID ...`. +/// - TTL: scheduled by the background TTL EXPORT task. +enum class ExportReplicatedMergeTreePartitionOrigin : uint8_t +{ + ALTER, + TTL, +}; + struct ExportReplicatedMergeTreePartitionManifest { String transaction_id; @@ -164,7 +173,6 @@ struct ExportReplicatedMergeTreePartitionManifest std::vector parts; time_t create_time; size_t max_retries; - size_t ttl_seconds; size_t task_timeout_seconds; size_t max_threads; bool parallel_formatting; @@ -175,6 +183,7 @@ struct ExportReplicatedMergeTreePartitionManifest String filename_pattern; bool write_full_path_in_iceberg_metadata = false; String iceberg_metadata_json; + ExportReplicatedMergeTreePartitionOrigin origin = ExportReplicatedMergeTreePartitionOrigin::ALTER; std::string toJsonString() const { @@ -205,9 +214,9 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("filename_pattern", filename_pattern); json.set("create_time", create_time); json.set("max_retries", max_retries); - json.set("ttl_seconds", ttl_seconds); json.set("task_timeout_seconds", task_timeout_seconds); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); + json.set("origin", String(magic_enum::enum_name(origin))); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -240,7 +249,6 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.parts.push_back(parts_array->getElement(static_cast(i))); manifest.create_time = json->getValue("create_time"); - manifest.ttl_seconds = json->getValue("ttl_seconds"); manifest.task_timeout_seconds = json->getValue("task_timeout_seconds"); manifest.max_threads = json->getValue("max_threads"); manifest.parallel_formatting = json->getValue("parallel_formatting"); @@ -262,6 +270,14 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.write_full_path_in_iceberg_metadata = json->getValue("write_full_path_in_iceberg_metadata"); + if (json->has("origin")) + { + auto parsed = magic_enum::enum_cast(json->getValue("origin")); + if (parsed.has_value()) + manifest.origin = parsed.value(); + /// Older nodes have no `origin` field; default ALTER is the right legacy behavior. + } + return manifest; } }; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index 8af873e0b89c..948b0b0a8faf 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -56,6 +56,12 @@ struct ExportReplicatedMergeTreePartitionTaskEntry { return manifest.create_time; } + + /// Provenance of this entry: ALTER (user) vs TTL (background task). + ExportReplicatedMergeTreePartitionOrigin getOrigin() const + { + return manifest.origin; + } }; struct ExportPartitionTaskEntryTagByCompositeKey {}; diff --git a/src/Storages/MergeTree/Compaction/MergeSelectorApplier.cpp b/src/Storages/MergeTree/Compaction/MergeSelectorApplier.cpp index e8f1551ade99..0d34a47619f4 100644 --- a/src/Storages/MergeTree/Compaction/MergeSelectorApplier.cpp +++ b/src/Storages/MergeTree/Compaction/MergeSelectorApplier.cpp @@ -48,6 +48,7 @@ struct ChooseContext const PartitionIdToTTLs & next_recompress_times; const time_t current_time; const bool aggressive; + const TTLDropDeletePartitionFilter & ttl_drop_delete_partition_filter; }; MergeSelectorChoices pack(const ChooseContext & ctx, PartsRanges && ranges, MergeType type) @@ -70,23 +71,39 @@ MergeSelectorChoices pack(const ChooseContext & ctx, PartsRanges && ranges, Merg MergeSelectorChoices tryChooseTTLMerge(const ChooseContext & ctx) { - /// Drop parts - 1 priority - if (!ctx.merge_constraints.empty()) + /// Build a filtered ranges view for TTL DROP / TTL DELETE that excludes partitions whose + /// destructive TTL is deferred (e.g. still awaiting TTL EXPORT completion). All parts of a + /// single range share the same partition_id so we can decide on the first part. + PartsRanges ttl_ranges_storage; + const PartsRanges * ttl_ranges_ptr = &ctx.ranges; + if (ctx.ttl_drop_delete_partition_filter) + { + ttl_ranges_storage.reserve(ctx.ranges.size()); + for (const auto & range : ctx.ranges) + { + if (range.empty() || ctx.ttl_drop_delete_partition_filter(range.front().info.getPartitionId())) + ttl_ranges_storage.push_back(range); + } + ttl_ranges_ptr = &ttl_ranges_storage; + } + const PartsRanges & ttl_ranges = *ttl_ranges_ptr; + + if (!ctx.merge_constraints.empty() && !ttl_ranges.empty()) { /// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space. std::vector ttl_constraints(ctx.merge_constraints.size(), {std::numeric_limits::max(), std::numeric_limits::max()}); TTLPartDropMergeSelector drop_ttl_selector(ctx.current_time, ctx.merge_tree_settings[MergeTreeSetting::max_parts_to_merge_at_once]); - if (auto merge_ranges = drop_ttl_selector.select(ctx.ranges, ttl_constraints, ctx.range_filter); !merge_ranges.empty()) + if (auto merge_ranges = drop_ttl_selector.select(ttl_ranges, ttl_constraints, ctx.range_filter); !merge_ranges.empty()) return pack(ctx, std::move(merge_ranges), MergeType::TTLDrop); } /// Delete rows - 2 priority - if (!ctx.merge_constraints.empty() && !ctx.merge_tree_settings[MergeTreeSetting::ttl_only_drop_parts]) + if (!ctx.merge_constraints.empty() && !ctx.merge_tree_settings[MergeTreeSetting::ttl_only_drop_parts] && !ttl_ranges.empty()) { TTLRowDeleteMergeSelector delete_ttl_selector(ctx.next_delete_times, ctx.current_time); - if (auto merge_ranges = delete_ttl_selector.select(ctx.ranges, ctx.merge_constraints, ctx.range_filter); !merge_ranges.empty()) + if (auto merge_ranges = delete_ttl_selector.select(ttl_ranges, ctx.merge_constraints, ctx.range_filter); !merge_ranges.empty()) return pack(ctx, std::move(merge_ranges), MergeType::TTLDelete); } @@ -167,11 +184,13 @@ MergeSelectorApplier::MergeSelectorApplier( std::vector && merge_constraints_, bool merge_with_ttl_allowed_, bool aggressive_, - IMergeSelector::RangeFilter range_filter_) + IMergeSelector::RangeFilter range_filter_, + TTLDropDeletePartitionFilter ttl_drop_delete_partition_filter_) : merge_constraints(std::move(merge_constraints_)) , merge_with_ttl_allowed(merge_with_ttl_allowed_) , aggressive(aggressive_) , range_filter(std::move(range_filter_)) + , ttl_drop_delete_partition_filter(std::move(ttl_drop_delete_partition_filter_)) { chassert(!merge_constraints.empty(), "At least one merge constraint should be passed"); @@ -204,6 +223,7 @@ MergeSelectorChoices MergeSelectorApplier::chooseMergesFrom( .next_recompress_times = next_recompress_times, .current_time = current_time, .aggressive = aggressive, + .ttl_drop_delete_partition_filter = ttl_drop_delete_partition_filter, }; if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && can_use_ttl_merges) diff --git a/src/Storages/MergeTree/Compaction/MergeSelectorApplier.h b/src/Storages/MergeTree/Compaction/MergeSelectorApplier.h index e96346c328d6..04bf4967df53 100644 --- a/src/Storages/MergeTree/Compaction/MergeSelectorApplier.h +++ b/src/Storages/MergeTree/Compaction/MergeSelectorApplier.h @@ -24,6 +24,10 @@ struct MergeSelectorChoice }; using MergeSelectorChoices = std::vector; +/// Returns true if a TTL DROP / TTL DELETE merge is allowed for the given partition_id. +/// Used to defer destructive TTL merges for partitions still awaiting TTL EXPORT completion. +using TTLDropDeletePartitionFilter = std::function; + class MergeSelectorApplier { public: @@ -31,12 +35,16 @@ class MergeSelectorApplier const bool merge_with_ttl_allowed = false; const bool aggressive = false; const IMergeSelector::RangeFilter range_filter = nullptr; + /// If set, TTL DROP / TTL DELETE selectors only consider partitions for which this returns true. + /// Recompression TTL and regular merges ignore this filter. + const TTLDropDeletePartitionFilter ttl_drop_delete_partition_filter = nullptr; MergeSelectorApplier( std::vector && merge_constraints_, bool merge_with_ttl_allowed_, bool aggressive_, - IMergeSelector::RangeFilter range_filter_); + IMergeSelector::RangeFilter range_filter_, + TTLDropDeletePartitionFilter ttl_drop_delete_partition_filter_ = nullptr); MergeSelectorChoices chooseMergesFrom( const PartsRanges & ranges, diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 3ce171e5fa3e..79171d4efb16 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -128,7 +128,11 @@ namespace } /* - Remove expired entries and fix non-committed exports that have already exported all parts. + Fix non-committed exports that have already exported all parts, and enforce task timeouts. + + Note: terminal entries (COMPLETED / FAILED / KILLED) are *not* expired here. The + `system.replicated_partition_exports` table is an append-only history that doubles as the + marker source for TTL EXPORT — entries must remain visible forever. Return values: - true: the cleanup was successful, the entry is removed from the entries_by_key container and the function returns true. Proceed to the next entry. @@ -149,33 +153,17 @@ namespace const LoggerPtr & log, const ContextPtr & storage_context, StorageReplicatedMergeTree & storage, - const std::string & key, const ExportReplicatedMergeTreePartitionManifest & metadata, const time_t now, const bool is_pending, - auto & entries_by_key, std::vector & deferred_commits ) { - bool has_expired = metadata.create_time < now - static_cast(metadata.ttl_seconds); - bool task_timed_out = is_pending && metadata.task_timeout_seconds > 0 && metadata.create_time + static_cast(metadata.task_timeout_seconds) < now; - if (has_expired && !is_pending) - { - zk->tryRemoveRecursive(fs::path(entry_path)); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive); - auto it = entries_by_key.find(key); - if (it != entries_by_key.end()) - entries_by_key.erase(it); - LOG_INFO(log, "ExportPartition Manifest Updating Task: Removed {}: expired", key); - - return true; - } - else if (task_timed_out) + if (task_timed_out) { const std::string status_path = fs::path(entry_path) / "status"; @@ -335,6 +323,7 @@ std::vector ExportPartitionManifestUpdatingTask:: info.parts_to_do = snapshot.manifest.parts.size(); info.parts = std::move(snapshot.manifest.parts); info.status = magic_enum::enum_name(snapshot.status); + info.origin = magic_enum::enum_name(snapshot.manifest.origin); info.last_exception_per_replica.reserve(snapshot.last_exception_per_replica.size()); size_t total_exception_count = 0; @@ -478,11 +467,9 @@ void ExportPartitionManifestUpdatingTask::poll() storage.log.load(), storage.getContext(), storage, - key, metadata, now, *status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, - entries_by_key, deferred_commits); if (cleanup_successful) diff --git a/src/Storages/MergeTree/ExportPartitionTTLTask.cpp b/src/Storages/MergeTree/ExportPartitionTTLTask.cpp new file mode 100644 index 000000000000..40221b21f9c1 --- /dev/null +++ b/src/Storages/MergeTree/ExportPartitionTTLTask.cpp @@ -0,0 +1,225 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + + +namespace DB +{ + +ExportPartitionTTLTask::ExportPartitionTTLTask(StorageReplicatedMergeTree & storage_) + : storage(storage_) +{ +} + +namespace +{ + +/// Default cadence between iterations when there is nothing immediately ready to export. +constexpr std::chrono::milliseconds kDefaultDelay{10'000}; + +/// Soft cap on the per-rule wait so we don't sleep for an hour if a partition becomes +/// eligible right after we exit run(). +constexpr std::chrono::milliseconds kMaxDelay{60 * 60 * 1000}; + +bool entryCoversPartition( + const ExportReplicatedMergeTreePartitionTaskEntry & entry, + const String & destination_database, + const String & destination_table, + const String & partition_id) +{ + return entry.manifest.destination_database == destination_database + && entry.manifest.destination_table == destination_table + && entry.manifest.partition_id == partition_id; +} + +} + +std::chrono::milliseconds ExportPartitionTTLTask::run() +{ + auto log = storage.log.load(); + + const auto metadata = storage.getInMemoryMetadataPtr(); + if (!metadata || !metadata->hasAnyExportTTL()) + return kDefaultDelay; + + /// Validate the partition expression once per tick. If it's unsupported (e.g. user manually + /// modified the metadata to add a TTL EXPORT with a non-curated partition expression), give up. + if (!MergeTreePartitionTopBoundary::isPartitionExpressionSupported(metadata->getPartitionKey())) + { + LOG_WARNING(log, + "ExportPartitionTTLTask: partition expression of table {} is not in the curated EXPORT TTL whitelist; " + "skipping. Drop the TTL EXPORT rule or change PARTITION BY to a supported expression.", + storage.getStorageID().getNameForLogs()); + return kDefaultDelay; + } + + const auto all_partition_ids_set = storage.getAllPartitionIds(); + if (all_partition_ids_set.empty()) + return kDefaultDelay; + + /// Sort partitions numerically (per the curated whitelist they're all integer-valued). + /// Lex compare is not always equivalent (DateTime unix-ts strings differ in length). + std::vector all_partition_ids(all_partition_ids_set.begin(), all_partition_ids_set.end()); + std::sort(all_partition_ids.begin(), all_partition_ids.end(), + [&](const String & a, const String & b) + { + return MergeTreePartitionTopBoundary::comparePartitionIds(metadata->getPartitionKey(), a, b) < 0; + }); + + const auto now = time(nullptr); + auto min_delay = kDefaultDelay; + + for (const auto & rule : metadata->table_ttl.export_ttl) + { + /// Resolve the destination once, through the storage helper. This yields a non-empty + /// database (an omitted `db.` qualifier resolves to the source table's database) that is + /// used both for cache lookups below and as `command.to_database` when scheduling the + /// export, so the database written into the manifest always matches the lookup keys. + const QualifiedTableName destination = storage.resolveExportTTLDestination(rule); + + /// Quickly skip rules that already have an in-flight TTL entry for this destination + /// (one-export-at-a-time per destination), and grab the marker. + String marker; + bool has_pending_for_destination = false; + { + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + { + if (entry.manifest.origin != ExportReplicatedMergeTreePartitionOrigin::TTL) + continue; + if (entry.manifest.destination_database != destination.database + || entry.manifest.destination_table != destination.table) + continue; + if (entry.status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + has_pending_for_destination = true; + break; + } + } + + /// Marker is derived on demand from the manifest cache (the only source of truth for + /// TTL-origin entries). This keeps `StorageReplicatedMergeTree` free of a redundant + /// derived data structure that has to be maintained from multiple write sites. + marker = storage.computeTTLExportMarkerLocked(destination); + } + + if (has_pending_for_destination) + continue; + + /// Walk candidates: partition_ids strictly greater than the marker, in ascending order. + for (const auto & partition_id : all_partition_ids) + { + if (!marker.empty() + && MergeTreePartitionTopBoundary::comparePartitionIds(metadata->getPartitionKey(), partition_id, marker) <= 0) + { + continue; + } + + /// Anti-duplication: if any entry (any origin, any status) covers this partition for + /// this destination, skip and advance the marker past it on the next add/recompute. + bool already_covered = false; + { + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + { + if (entryCoversPartition(entry, destination.database, destination.table, partition_id)) + { + already_covered = true; + break; + } + } + } + if (already_covered) + { + /// The entry is already in the cache (e.g. user manually exported it with ALTER). + /// We don't update the TTL marker here because the entry is not origin=TTL; we just + /// skip past it on this pass and the next eligible TTL export will move the marker. + continue; + } + + /// Has the partition's TTL window elapsed? + const time_t top_boundary = MergeTreePartitionTopBoundary::computeTopBoundary( + metadata->getPartitionKey(), partition_id); + const time_t fires_at = MergeTreePartitionTopBoundary::addInterval( + top_boundary, rule.export_interval_kind, rule.export_interval_count); + + if (fires_at > now) + { + /// Not yet eligible. Sleep just long enough to wake up when it is. + const auto wait_ms = std::min( + std::chrono::duration_cast(kMaxDelay).count(), + static_cast(fires_at - now) * 1000); + if (wait_ms > 0 && std::chrono::milliseconds(wait_ms) < min_delay) + min_delay = std::chrono::milliseconds(wait_ms); + break; + } + + /// Schedule the export. Catch any exception (including the experimental-flag gate) + /// and keep looping over the next rule. + try + { + PartitionCommand command; + command.type = PartitionCommand::EXPORT_PARTITION; + auto partition_ast = make_intrusive(); + partition_ast->setPartitionID(make_intrusive(partition_id)); + command.partition = partition_ast; + command.to_database = destination.database; + command.to_table = destination.table; + + /// A new context — using the global query context — keeps server-level settings. + /// We don't want user-session overrides for the background scheduler. + auto background_context = Context::createCopy(storage.getContext()); + + background_context->setSetting("allow_insert_into_iceberg", 1); + + storage.exportPartitionToTableWithOrigin( + command, background_context, ExportReplicatedMergeTreePartitionOrigin::TTL); + + LOG_INFO(log, + "ExportPartitionTTLTask: scheduled TTL export of partition {} from {} to {}.{}", + partition_id, + storage.getStorageID().getNameForLogs(), + destination.database, destination.table); + } + catch (const Exception & e) + { + /// SUPPORT_IS_DISABLED is the typical "experimental flag is off" exception. Log + /// once per minute per (table, destination) to avoid log spam. + auto & last_at = last_experimental_off_log_at[destination]; + if (last_at + 60 < now) + { + LOG_INFO(log, + "ExportPartitionTTLTask: could not schedule TTL export of partition {} from {} to {}.{}: {}", + partition_id, + storage.getStorageID().getNameForLogs(), + destination.database, destination.table, + e.message()); + last_at = now; + } + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + + /// At most one TTL-scheduled export per (src, dst) per tick — break out to the next rule. + break; + } + } + + return min_delay; +} + +} diff --git a/src/Storages/MergeTree/ExportPartitionTTLTask.h b/src/Storages/MergeTree/ExportPartitionTTLTask.h new file mode 100644 index 000000000000..3f648f026b04 --- /dev/null +++ b/src/Storages/MergeTree/ExportPartitionTTLTask.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +class StorageReplicatedMergeTree; + +/// Background task that periodically scans `metadata.table_ttl.export_ttl` rules and schedules +/// partition exports for partitions whose top-boundary + TTL interval has elapsed. +/// +/// Invariants: +/// - Only one TTL-originated export is in flight per `(source, destination)` pair. The marker map +/// on the storage tells the task which partition_ids have already been TTL-scheduled. +/// - The task happily runs even when `allow_experimental_export_merge_tree_partition` is OFF: +/// `StorageReplicatedMergeTree::exportPartitionToTableWithOrigin` will throw early in that case +/// and we just log + reschedule. This keeps the TTL configuration coherent on every replica +/// regardless of the experimental gate. +/// - The task is replica-local; ZK uniqueness on `export_key` makes concurrent attempts harmless. +class ExportPartitionTTLTask +{ +public: + explicit ExportPartitionTTLTask(StorageReplicatedMergeTree & storage); + + /// One iteration. Returns the suggested delay (in milliseconds) for the next scheduling. + std::chrono::milliseconds run(); + +private: + StorageReplicatedMergeTree & storage; + + /// Throttle log spam: per (table, destination), keep timestamp of last "experimental flag off" + /// log so we report it at most once a minute. + std::unordered_map last_experimental_off_log_at; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index a3583fc0d081..e82086ead0fb 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -605,6 +606,16 @@ MergeTreeTemporaryPartPtr MergeTreeDataWriter::writeTempPartImpl( ContextPtr context, UInt64 block_number) { + /// Backfill rejection: once a partition has been TTL-exported (`origin=TTL`) for any destination, + /// inserts targeting that partition or any older partition for the same destination are rejected + /// unless the operator explicitly enables `allow_inserts_into_exported_partition`. This guards + /// against silent duplication: exported partitions are owned by the destination storage. + if (auto * replicated = dynamic_cast(&data); + replicated && !isPatchPartitionId(partition_id)) + { + replicated->checkInsertAllowedByExportTTLMarker(partition_id); + } + auto temp_part = std::make_unique(); Block & block = *block_with_partition.block; MergeTreePartition & partition = block_with_partition.partition; diff --git a/src/Storages/MergeTree/MergeTreePartitionTopBoundary.cpp b/src/Storages/MergeTree/MergeTreePartitionTopBoundary.cpp new file mode 100644 index 000000000000..c0d9b5f59ff9 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreePartitionTopBoundary.cpp @@ -0,0 +1,343 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_TTL_EXPRESSION; + extern const int LOGICAL_ERROR; +} + +namespace MergeTreePartitionTopBoundary +{ + +namespace +{ + +enum class PartitionKind : uint8_t +{ + IdentityDate, /// PARTITION BY ; partition_id = yyyymmdd + IdentityDate32, /// PARTITION BY ; partition_id = yyyymmdd + IdentityDateTime, /// PARTITION BY ; partition_id = unix seconds + IdentityDateTime64, /// PARTITION BY ; partition_id = raw underlying (rare) + ToYear, /// partition_id = YYYY + ToYYYYMM, /// partition_id = YYYYMM + ToYYYYMMDD, /// partition_id = YYYYMMDD + ToStartOfYear, /// output Date(start of year) -> yyyymmdd + ToStartOfQuarter, /// output Date(start of quarter) -> yyyymmdd + ToStartOfMonth, /// output Date(start of month) -> yyyymmdd + ToStartOfWeek, /// output Date(Sunday) -> yyyymmdd + ToMonday, /// output Date(Monday) -> yyyymmdd + ToStartOfDay, /// output DateTime(start of day) -> unix seconds + ToStartOfHour, /// output DateTime(start of hour) -> unix seconds + ToStartOfMinute, /// output DateTime(start of minute) -> unix seconds + ToYearNumSinceEpoch, /// partition_id = years since 1970 (Iceberg "year" transform) + ToMonthNumSinceEpoch,/// partition_id = months since 1970-01 (Iceberg "month" transform) + ToRelativeDayNum, /// partition_id = days since 1970-01-01 (Iceberg "days" transform) + ToRelativeHourNum, /// partition_id = hours since 1970-01-01 00 (Iceberg "hours" transform) +}; + +struct Classification +{ + PartitionKind kind; +}; + +std::optional classify(const KeyDescription & partition_key) +{ + if (!partition_key.expression_list_ast) + return {}; + + const auto & children = partition_key.expression_list_ast->children; + if (children.size() != 1) + return {}; + + if (partition_key.sample_block.columns() != 1) + return {}; + + const auto & sample_type = partition_key.sample_block.getByPosition(0).type; + const auto & node = children[0]; + + /// Identity partition keys: a bare column name. + if (const auto * ident = node->as()) + { + (void)ident; + if (typeid_cast(sample_type.get())) + return Classification{PartitionKind::IdentityDate}; + if (typeid_cast(sample_type.get())) + return Classification{PartitionKind::IdentityDate32}; + if (typeid_cast(sample_type.get())) + return Classification{PartitionKind::IdentityDateTime}; + if (typeid_cast(sample_type.get())) + return Classification{PartitionKind::IdentityDateTime64}; + return {}; + } + + /// Function-based partition keys: must be a single-argument call on a column. + if (const auto * func = node->as()) + { + if (!func->arguments || func->arguments->children.size() != 1) + return {}; + if (!func->arguments->children[0]->as()) + return {}; + + const String & name = func->name; + if (name == "toYear") + return Classification{PartitionKind::ToYear}; + if (name == "toYYYYMM") + return Classification{PartitionKind::ToYYYYMM}; + if (name == "toYYYYMMDD") + return Classification{PartitionKind::ToYYYYMMDD}; + if (name == "toStartOfYear") + return Classification{PartitionKind::ToStartOfYear}; + if (name == "toStartOfQuarter") + return Classification{PartitionKind::ToStartOfQuarter}; + if (name == "toStartOfMonth") + return Classification{PartitionKind::ToStartOfMonth}; + if (name == "toStartOfWeek") + return Classification{PartitionKind::ToStartOfWeek}; + if (name == "toMonday") + return Classification{PartitionKind::ToMonday}; + if (name == "toStartOfDay") + return Classification{PartitionKind::ToStartOfDay}; + if (name == "toStartOfHour") + return Classification{PartitionKind::ToStartOfHour}; + if (name == "toStartOfMinute") + return Classification{PartitionKind::ToStartOfMinute}; + if (name == "toYearNumSinceEpoch") + return Classification{PartitionKind::ToYearNumSinceEpoch}; + if (name == "toMonthNumSinceEpoch") + return Classification{PartitionKind::ToMonthNumSinceEpoch}; + if (name == "toRelativeDayNum") + return Classification{PartitionKind::ToRelativeDayNum}; + if (name == "toRelativeHourNum") + return Classification{PartitionKind::ToRelativeHourNum}; + } + + return {}; +} + +UInt64 parseUnsigned(const String & partition_id) +{ + ReadBufferFromString buf(partition_id); + UInt64 value; + readText(value, buf); + assertEOF(buf); + return value; +} + +/// Convert yyyymmdd to (year, month, day) tuple. +struct YMD { Int16 year; UInt8 month; UInt8 day; }; + +YMD parseYYYYMMDD(UInt64 yyyymmdd) +{ + return YMD{ + .year = static_cast(yyyymmdd / 10000), + .month = static_cast((yyyymmdd / 100) % 100), + .day = static_cast(yyyymmdd % 100), + }; +} + +time_t makeDateTimeAtEndOfDay(Int16 year, UInt8 month, UInt8 day) +{ + const auto & lut = DateLUT::serverTimezoneInstance(); + time_t start_of_day = lut.makeDateTime(year, month, day, 0, 0, 0); + return start_of_day + 86400 - 1; +} + +} + +bool isPartitionExpressionSupported(const KeyDescription & partition_key) +{ + return classify(partition_key).has_value(); +} + +void checkPartitionExpressionSupported(const KeyDescription & partition_key) +{ + if (!isPartitionExpressionSupported(partition_key)) + { + throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, + "TTL EXPORT requires the PARTITION BY expression to be one of: identity Date/Date32/DateTime/DateTime64 column, " + "toYear, toYYYYMM, toYYYYMMDD, toMonday, toStartOf{{Year,Quarter,Month,Week,Day,Hour,Minute}}, " + "toYearNumSinceEpoch, toMonthNumSinceEpoch, toRelativeDayNum, toRelativeHourNum."); + } +} + +time_t computeTopBoundary(const KeyDescription & partition_key, const String & partition_id) +{ + const auto cls = classify(partition_key); + if (!cls) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "computeTopBoundary called for unsupported partition expression"); + + const auto & lut = DateLUT::serverTimezoneInstance(); + const UInt64 raw = parseUnsigned(partition_id); + + switch (cls->kind) + { + case PartitionKind::IdentityDate: + case PartitionKind::IdentityDate32: + { + const auto ymd = parseYYYYMMDD(raw); + return makeDateTimeAtEndOfDay(ymd.year, ymd.month, ymd.day); + } + case PartitionKind::IdentityDateTime: + case PartitionKind::IdentityDateTime64: + { + /// DateTime / DateTime64 identity partitioning groups all rows with the same value into a single + /// partition. The supremum of that range is the value itself. + return static_cast(raw); + } + case PartitionKind::ToYear: + { + const Int16 year = static_cast(raw); + return makeDateTimeAtEndOfDay(year, 12, 31); + } + case PartitionKind::ToYYYYMM: + { + const Int16 year = static_cast(raw / 100); + const UInt8 month = static_cast(raw % 100); + const UInt8 last_day = lut.daysInMonth(year, month); + return makeDateTimeAtEndOfDay(year, month, last_day); + } + case PartitionKind::ToYYYYMMDD: + { + const auto ymd = parseYYYYMMDD(raw); + return makeDateTimeAtEndOfDay(ymd.year, ymd.month, ymd.day); + } + case PartitionKind::ToStartOfYear: + { + const auto ymd = parseYYYYMMDD(raw); + /// Range is the whole year. + return makeDateTimeAtEndOfDay(ymd.year, 12, 31); + } + case PartitionKind::ToStartOfQuarter: + { + const auto ymd = parseYYYYMMDD(raw); + /// Range is the whole quarter (3 months starting from the start month). + const UInt8 last_month_in_quarter = static_cast(ymd.month + 2); + const UInt8 last_day = lut.daysInMonth(ymd.year, last_month_in_quarter); + return makeDateTimeAtEndOfDay(ymd.year, last_month_in_quarter, last_day); + } + case PartitionKind::ToStartOfMonth: + { + const auto ymd = parseYYYYMMDD(raw); + const UInt8 last_day = lut.daysInMonth(ymd.year, ymd.month); + return makeDateTimeAtEndOfDay(ymd.year, ymd.month, last_day); + } + case PartitionKind::ToStartOfWeek: + case PartitionKind::ToMonday: + { + const auto ymd = parseYYYYMMDD(raw); + const time_t start_of_day = lut.makeDateTime(ymd.year, ymd.month, ymd.day, 0, 0, 0); + /// One full week = 7 days. + return start_of_day + 7 * 86400 - 1; + } + case PartitionKind::ToStartOfDay: + { + return static_cast(raw) + 86400 - 1; + } + case PartitionKind::ToStartOfHour: + { + return static_cast(raw) + 3600 - 1; + } + case PartitionKind::ToStartOfMinute: + { + return static_cast(raw) + 60 - 1; + } + case PartitionKind::ToYearNumSinceEpoch: + { + /// N years since 1970 -> top boundary is the last second of year 1970 + N. + const Int16 year = static_cast(1970 + raw); + return makeDateTimeAtEndOfDay(year, 12, 31); + } + case PartitionKind::ToMonthNumSinceEpoch: + { + /// N months since 1970-01 -> top boundary is the last second of that month. + const Int16 year = static_cast(1970 + raw / 12); + const UInt8 month = static_cast((raw % 12) + 1); + const UInt8 last_day = lut.daysInMonth(year, month); + return makeDateTimeAtEndOfDay(year, month, last_day); + } + case PartitionKind::ToRelativeDayNum: + { + /// N days since 1970-01-01 in the server timezone -> top boundary is the last second of that day. + const ExtendedDayNum day_num{static_cast(raw)}; + const time_t start_of_day = lut.fromDayNum(day_num); + return start_of_day + 86400 - 1; + } + case PartitionKind::ToRelativeHourNum: + { + /// N hours since 1970-01-01 00:00 -> top boundary is the last second of that hour. + /// NOTE (strtgbb): for timezones whose offset is not a whole multiple of one hour at the + /// Unix epoch (Pacific/Pitcairn, Pacific/Kiritimati) this is off by a few seconds, which + /// is acceptable for TTL EXPORT (it errs on the side of treating the partition as + /// expired slightly later than ideal, never sooner). + return static_cast(raw) * 3600 + 3600 - 1; + } + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable partition kind in computeTopBoundary"); +} + +time_t addInterval(time_t top_boundary, IntervalKind kind, Int64 count) +{ + const auto & lut = DateLUT::serverTimezoneInstance(); + switch (kind.kind) + { + case IntervalKind::Kind::Nanosecond: + case IntervalKind::Kind::Microsecond: + case IntervalKind::Kind::Millisecond: + /// Sub-second intervals round to 0 seconds; this is fine for export TTL which works in seconds. + return top_boundary; + case IntervalKind::Kind::Second: + return top_boundary + count; + case IntervalKind::Kind::Minute: + return top_boundary + count * 60; + case IntervalKind::Kind::Hour: + return top_boundary + count * 3600; + case IntervalKind::Kind::Day: + return lut.addDays(top_boundary, count); + case IntervalKind::Kind::Week: + return lut.addWeeks(top_boundary, count); + case IntervalKind::Kind::Month: + return lut.addMonths(static_cast(top_boundary), count); + case IntervalKind::Kind::Quarter: + return lut.addMonths(static_cast(top_boundary), count * 3); + case IntervalKind::Kind::Year: + return lut.addYears(static_cast(top_boundary), count); + } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown IntervalKind {}", static_cast(kind.kind)); +} + +int comparePartitionIds(const KeyDescription & partition_key, const String & a, const String & b) +{ + const auto cls = classify(partition_key); + if (!cls) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "comparePartitionIds called for unsupported partition expression"); + + const UInt64 va = parseUnsigned(a); + const UInt64 vb = parseUnsigned(b); + if (va < vb) return -1; + if (va > vb) return 1; + return 0; +} + +} + +} diff --git a/src/Storages/MergeTree/MergeTreePartitionTopBoundary.h b/src/Storages/MergeTree/MergeTreePartitionTopBoundary.h new file mode 100644 index 000000000000..ba40307df554 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreePartitionTopBoundary.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +struct KeyDescription; + +/// Helpers for deriving a partition's "top boundary" (the supremum of the underlying date/time +/// values that fall into the partition) for the TTL EXPORT feature. +/// +/// Supported PARTITION BY expressions (the curated whitelist): +/// - Identity Date / Date32 / DateTime / DateTime64 column. +/// - toYear / toYYYYMM / toYYYYMMDD / toMonday. +/// - toStartOfYear / toStartOfQuarter / toStartOfMonth / toStartOfWeek +/// / toStartOfDay / toStartOfHour / toStartOfMinute. +/// +/// The boundary is returned as the last second of the partition's range, expressed in unix +/// seconds (server timezone). Adding the TTL interval to this boundary gives the earliest +/// time at which the partition becomes eligible for export. +namespace MergeTreePartitionTopBoundary +{ + /// Returns true iff the partition key is in the curated whitelist. + bool isPartitionExpressionSupported(const KeyDescription & partition_key); + + /// Throws BAD_TTL_EXPRESSION if not supported, with an explanatory message. + void checkPartitionExpressionSupported(const KeyDescription & partition_key); + + /// Compute the top boundary (inclusive) of the partition identified by `partition_id`. + /// Returns the value as unix seconds. The partition expression must be supported. + /// Throws on parse failure or unsupported partition expression. + time_t computeTopBoundary(const KeyDescription & partition_key, const String & partition_id); + + /// Add the TTL interval to the top boundary. Honors variable-length kinds (Month/Quarter/Year) + /// exactly via DateLUT arithmetic. + time_t addInterval(time_t top_boundary, IntervalKind kind, Int64 count); + + /// Compare two partition_ids numerically (as if by their underlying value). + /// Returns negative / zero / positive. The partition expression must be supported. + int comparePartitionIds(const KeyDescription & partition_key, const String & a, const String & b); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeSettings.cpp b/src/Storages/MergeTree/MergeTreeSettings.cpp index a6ed4d9ec463..0caa6d119105 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -1518,6 +1518,11 @@ namespace ErrorCodes Background task which reduces blocking parts for shared merge tree tables. Only in ClickHouse Cloud )", 0) \ + DECLARE(Bool, allow_inserts_into_exported_partition, false, R"( + If false (the default), reject inserts whose partition_id is at-or-below any destination's TTL EXPORT marker. + Once a partition has been TTL-exported the destination storage owns that data; allowing inserts after the fact + can produce silent data duplication. Set to true to override this guard for backfill scenarios. + )", 0) \ DECLARE(Seconds, refresh_parts_interval, 0, R"( If it is greater than zero - refresh the list of data parts from the underlying filesystem to check if the data was updated under the hood. It can be set only if the table is located on readonly disks (which means that this is a readonly replica, while data is being written by another replica). diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index d06b6d7a9d24..9746bb7120b4 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -193,6 +193,13 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() storage.export_merge_tree_partition_status_handling_task->activateAndSchedule(); } + /// TTL EXPORT task is *not* gated by the experimental flag: it is always activated so that + /// `TTL EXPORT` configuration is honored as soon as the metadata says so. If the experimental + /// flag is off the task simply fails to schedule each tick and reschedules with the + /// SUPPORT_IS_DISABLED reason logged. + if (storage.export_merge_tree_partition_ttl_schedule) + storage.export_merge_tree_partition_ttl_schedule->activateAndSchedule(); + storage.cleanup_thread.start(); storage.async_block_ids_cache.start(); storage.part_check_thread.start(); diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index e63605989cde..08e928fe2209 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -75,6 +76,7 @@ namespace ErrorCodes extern const int NO_REPLICA_NAME_GIVEN; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; extern const int SUPPORT_IS_DISABLED; + extern const int NOT_IMPLEMENTED; } @@ -711,6 +713,19 @@ static StoragePtr create(const StorageFactory::Arguments & args) { metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST( args.storage_def->ttl_table->ptr(), metadata.columns, context, metadata.primary_key, allow_suspicious_ttl); + + if (!metadata.table_ttl.export_ttl.empty()) + { + /// EXPORT TTL relies on the ALTER ... EXPORT PARTITION pipeline, which is only + /// available for Replicated*MergeTree. + if (!replicated) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "TTL EXPORT is only supported for Replicated*MergeTree, got {}", args.engine_name); + } + + MergeTreePartitionTopBoundary::checkPartitionExpressionSupported(metadata.partition_key); + } } storage_settings->loadFromQuery(*args.storage_def, context, LoadingStrictnessLevel::ATTACH <= args.mode); diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index bb0e26c4cea0..a2ae12c7698a 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -289,12 +289,12 @@ TTLTableDescription StorageInMemoryMetadata::getTableTTLs() const bool StorageInMemoryMetadata::hasAnyTableTTL() const { - return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL(); + return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyExportTTL(); } bool StorageInMemoryMetadata::hasOnlyRowsTTL() const { - bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL(); + bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL() || hasAnyExportTTL(); return hasRowsTTL() && !has_any_other_ttl; } @@ -358,6 +358,16 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const return !table_ttl.group_by_ttl.empty(); } +TTLDescriptions StorageInMemoryMetadata::getExportTTLs() const +{ + return table_ttl.export_ttl; +} + +bool StorageInMemoryMetadata::hasAnyExportTTL() const +{ + return !table_ttl.export_ttl.empty(); +} + ColumnDependencies StorageInMemoryMetadata::getColumnDependencies( const NameSet & updated_columns, bool include_ttl_target, diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index aa577f8d02ff..c32bbd2a69c2 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -188,6 +188,11 @@ struct StorageInMemoryMetadata TTLDescriptions getGroupByTTLs() const; bool hasAnyGroupByTTL() const; + /// Just wrapper for table TTLs, return EXPORT TTL rules. + /// One entry per destination table for Replicated*MergeTree. + TTLDescriptions getExportTTLs() const; + bool hasAnyExportTTL() const; + using HasDependencyCallback = std::function; /// Returns columns, which will be needed to calculate dependencies (skip indices, projections, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 750099f1b717..cf18ea787ec5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -214,7 +215,6 @@ namespace Setting extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsBool export_merge_tree_partition_force_export; extern const SettingsUInt64 export_merge_tree_partition_max_retries; - extern const SettingsUInt64 export_merge_tree_partition_manifest_ttl; extern const SettingsUInt64 export_merge_tree_partition_task_timeout_seconds; extern const SettingsBool output_format_parallel_formatting; extern const SettingsBool output_format_parquet_parallel_encoding; @@ -236,6 +236,7 @@ namespace Setting namespace MergeTreeSetting { extern const MergeTreeSettingsBool allow_experimental_replacing_merge_with_cleanup; + extern const MergeTreeSettingsBool allow_inserts_into_exported_partition; extern const MergeTreeSettingsBool allow_remote_fs_zero_copy_replication; extern const MergeTreeSettingsBool always_use_copy_instead_of_hardlinks; extern const MergeTreeSettingsBool assign_part_uuids; @@ -343,6 +344,7 @@ namespace ErrorCodes extern const int PENDING_MUTATIONS_NOT_ALLOWED; extern const int EXPORT_PARTITION_ALREADY_EXPORTED; extern const int PARTITION_EXPORT_FAILED; + extern const int EXPORT_PARTITION_BACKFILL_NOT_ALLOWED; } namespace ServerSetting @@ -548,6 +550,17 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( export_merge_tree_partition_select_task->deactivate(); } + /// TTL EXPORT task is created unconditionally so that `TTL EXPORT` rules in the metadata are + /// always honored: the parser, metadata, system-table column, and marker cache are always + /// available. When `allow_experimental_export_merge_tree_partition` is OFF the task simply + /// catches the SUPPORT_IS_DISABLED exception from `exportPartitionToTable` and reschedules. + export_merge_tree_partition_ttl_task = std::make_shared(*this); + export_merge_tree_partition_ttl_schedule = getContext()->getSchedulePool().createTask( + getStorageID(), + getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::export_merge_tree_partition_ttl_task)", + [this] { runExportPartitionTTLTask(); }); + export_merge_tree_partition_ttl_schedule->deactivate(); + bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_info.zookeeper_name); auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::StorageReplicatedMergeTree"); @@ -4383,7 +4396,9 @@ void StorageReplicatedMergeTree::mergeSelectingTask() /*merge_constraints=*/{{max_source_parts_bytes_for_merge, max_result_part_rows}}, /*merge_with_ttl_allowed=*/merge_with_ttl_allowed, /*aggressive_=*/false, - /*range_filter_=*/nullptr + /*range_filter_=*/nullptr, + /*ttl_drop_delete_partition_filter_=*/ + [this](const String & pid) { return isPartitionAllowedForTTLDropDelete(pid); } )); if (partitions_to_merge_in.empty()) @@ -4402,7 +4417,9 @@ void StorageReplicatedMergeTree::mergeSelectingTask() /*merge_constraints=*/{{max_source_parts_bytes_for_merge, max_result_part_rows}}, /*merge_with_ttl_allowed=*/merge_with_ttl_allowed, /*aggressive_=*/false, - /*range_filter_=*/nullptr + /*range_filter_=*/nullptr, + /*ttl_drop_delete_partition_filter_=*/ + [this](const String & pid) { return isPartitionAllowedForTTLDropDelete(pid); } ), partitions_to_merge_in); @@ -4656,6 +4673,202 @@ std::vector StorageReplicatedMergeTree::getPartit return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); } +void StorageReplicatedMergeTree::runExportPartitionTTLTask() +{ + auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::runExportPartitionTTLTask"); + std::chrono::milliseconds delay{60'000}; + try + { + if (export_merge_tree_partition_ttl_task) + delay = export_merge_tree_partition_ttl_task->run(); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + + export_merge_tree_partition_ttl_schedule->scheduleAfter(static_cast(delay.count())); +} + +QualifiedTableName StorageReplicatedMergeTree::resolveExportTTLDestination(const TTLDescription & rule) const +{ + /// An omitted `db.` qualifier resolves to the source table's own database. We deliberately use + /// the source's database rather than a session/global current database: it is always set, stable + /// across restarts and replication metadata reloads, and yields a non-empty name that we then + /// feed verbatim into the export manifest, guaranteeing it matches the keys used for cache lookups. + return QualifiedTableName{ + rule.destination_database.empty() ? getStorageID().getDatabaseName() : rule.destination_database, + rule.destination_name}; +} + +std::string StorageReplicatedMergeTree::computeTTLExportMarkerLocked(const QualifiedTableName & destination) const +{ + /// Caller holds `export_merge_tree_partition_mutex`. Linear scan over the entries + /// container, which is bounded by the number of in-flight + historical export tasks + /// for this storage (a small number in practice). + std::string max_pid; + const auto & partition_key = getInMemoryMetadataPtr()->getPartitionKey(); + const bool can_compare = MergeTreePartitionTopBoundary::isPartitionExpressionSupported(partition_key); + for (const auto & entry : export_merge_tree_partition_task_entries_by_key) + { + if (entry.manifest.origin != ExportReplicatedMergeTreePartitionOrigin::TTL) + continue; + if (entry.manifest.destination_database != destination.database + || entry.manifest.destination_table != destination.table) + continue; + if (max_pid.empty()) + { + max_pid = entry.manifest.partition_id; + continue; + } + if (can_compare + && MergeTreePartitionTopBoundary::comparePartitionIds(partition_key, entry.manifest.partition_id, max_pid) > 0) + { + max_pid = entry.manifest.partition_id; + } + } + return max_pid; +} + +bool StorageReplicatedMergeTree::isPartitionAllowedForTTLDropDelete(const String & partition_id) const +{ + /// Fast-path: no TTL EXPORT rules at all → drop / delete proceeds freely. + auto metadata = getInMemoryMetadataPtr(); + if (!metadata->hasAnyExportTTL()) + return true; + + const auto & export_ttls = metadata->getExportTTLs(); + + std::lock_guard lock(export_merge_tree_partition_mutex); + /// For every TTL EXPORT destination there must be a COMPLETED entry for `partition_id`, + /// otherwise this partition is "owned by ClickHouse" pending export and we must defer the + /// destructive TTL merge. Failed / killed entries do NOT satisfy the requirement — the user + /// must explicitly resolve them (force_export) before TTL DROP can proceed. + for (const auto & rule : export_ttls) + { + const QualifiedTableName destination = resolveExportTTLDestination(rule); + const auto composite_key = partition_id + "_" + destination.getFullName(); + auto it = export_merge_tree_partition_task_entries_by_key.find(composite_key); + if (it == export_merge_tree_partition_task_entries_by_key.end()) + return false; + if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::COMPLETED) + return false; + } + return true; +} + +void StorageReplicatedMergeTree::validateExportTTLDestinations(const StorageInMemoryMetadata & metadata, ContextPtr query_context) const +{ + if (!metadata.hasAnyExportTTL()) + return; + + /// The PARTITION BY whitelist is enforced at `registerStorageMergeTree` time, but we re-check + /// here so MODIFY TTL on a table whose partition key was somehow already incompatible still + /// fails loudly rather than silently breaking the scheduler. + MergeTreePartitionTopBoundary::checkPartitionExpressionSupported(metadata.getPartitionKey()); + + auto query_to_string = [](const ASTPtr & ast) + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + const auto src_partition_ast = metadata.getPartitionKeyAST(); + const auto & src_columns = metadata.getColumns(); + + for (const auto & rule : metadata.getExportTTLs()) + { + const QualifiedTableName destination = resolveExportTTLDestination(rule); + const String & dest_database = destination.database; + const String & dest_table = destination.table; + + StoragePtr dest_storage; + try + { + dest_storage = DatabaseCatalog::instance().tryGetTable({dest_database, dest_table}, query_context); + } + catch (...) + { + /// Destination database / catalog not available right now — defer to runtime. + LOG_INFO(log.load(), "Skipping TTL EXPORT destination validation for {}.{}: catalog unavailable. Will re-check at runtime.", + dest_database, dest_table); + continue; + } + + if (!dest_storage) + { + LOG_INFO(log.load(), "TTL EXPORT destination {}.{} does not exist yet; deferring schema validation to runtime.", + dest_database, dest_table); + continue; + } + + if (dest_storage->getStorageID() == this->getStorageID()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "TTL EXPORT destination must differ from the source table ({}.{})", + dest_database, dest_table); + } + + if (!dest_storage->supportsImport(query_context)) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "TTL EXPORT destination {}.{} (engine {}) does not support MergeTree parts import", + dest_database, dest_table, dest_storage->getName()); + } + + auto dest_snapshot = dest_storage->getInMemoryMetadataPtr(); + if (src_columns.getReadable().sizeOfDifference(dest_snapshot->getColumns().getInsertable())) + { + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, + "TTL EXPORT source and destination {}.{} have different structure", + dest_database, dest_table); + } + + /// Iceberg-spec partitioning is verified at runtime by `verifyIcebergPartitionCompatibility` + /// which requires loading Iceberg metadata; we only do the cheap AST compare here. + if (!dest_storage->isDataLake()) + { + if (query_to_string(src_partition_ast) != query_to_string(dest_snapshot->getPartitionKeyAST())) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "TTL EXPORT source and destination {}.{} have different partition keys", + dest_database, dest_table); + } + } + } +} + +void StorageReplicatedMergeTree::checkInsertAllowedByExportTTLMarker(const String & partition_id) const +{ + auto metadata = getInMemoryMetadataPtr(); + if (!metadata->hasAnyExportTTL()) + return; + if ((*getSettings())[MergeTreeSetting::allow_inserts_into_exported_partition]) + return; + + const auto & partition_key = metadata->getPartitionKey(); + if (!MergeTreePartitionTopBoundary::isPartitionExpressionSupported(partition_key)) + return; + + /// Derive the marker for every TTL EXPORT destination on demand from the manifest cache. + /// The cache size is bounded by the number of in-flight + historical export tasks. + std::lock_guard lock(export_merge_tree_partition_mutex); + for (const auto & rule : metadata->getExportTTLs()) + { + const QualifiedTableName destination = resolveExportTTLDestination(rule); + const auto marker = computeTTLExportMarkerLocked(destination); + if (marker.empty()) + continue; + if (MergeTreePartitionTopBoundary::comparePartitionIds(partition_key, partition_id, marker) <= 0) + { + throw Exception(ErrorCodes::EXPORT_PARTITION_BACKFILL_NOT_ALLOWED, + "Cannot insert into partition_id '{}': it is at or below the TTL EXPORT marker '{}' " + "for destination {}.{}. The partition is owned by the destination storage. " + "To override, set `allow_inserts_into_exported_partition = 1`.", + partition_id, marker, destination.database, destination.table); + } + } +} + StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, @@ -6057,6 +6270,9 @@ void StorageReplicatedMergeTree::partialShutdown() export_merge_tree_partition_status_handling_task->deactivate(); } + if (export_merge_tree_partition_ttl_schedule) + export_merge_tree_partition_ttl_schedule->deactivate(); + cleanup_thread.stop(); deduplication_hashes_cache.stop(); async_block_ids_cache.stop(); @@ -8281,6 +8497,15 @@ void StorageReplicatedMergeTree::fetchPartition( } void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) +{ + /// Public override: ALTER-originated entry. + exportPartitionToTableWithOrigin(command, query_context, ExportReplicatedMergeTreePartitionOrigin::ALTER); +} + +void StorageReplicatedMergeTree::exportPartitionToTableWithOrigin( + const PartitionCommand & command, + ContextPtr query_context, + ExportReplicatedMergeTreePartitionOrigin origin) { auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::exportPartitionToTable"); if (!query_context->getServerSettings()[ServerSetting::allow_experimental_export_merge_tree_partition]) @@ -8322,7 +8547,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & try { - exportPartitionToTable(sub, query_context); + exportPartitionToTableWithOrigin(sub, query_context, origin); } catch (const Exception & e) { @@ -8416,36 +8641,13 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); if (zookeeper->exists(partition_exports_path)) { - LOG_INFO(log, "Export with key {} is already exported or it is being exported. Checking if it has expired so that we can overwrite it", export_key); - - bool has_expired = false; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); - if (zookeeper->exists(fs::path(partition_exports_path) / "metadata.json")) - { - std::string metadata_json; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (zookeeper->tryGet(fs::path(partition_exports_path) / "metadata.json", metadata_json)) - { - const auto manifest = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - - const auto now = time(nullptr); - const auto expiration_time = manifest.create_time + manifest.ttl_seconds; - - LOG_INFO(log, "Export with key {} has expiration time {}, now is {}", export_key, expiration_time, now); - - if (static_cast(expiration_time) < now) - { - has_expired = true; - } - } - } - - if (!has_expired && !query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export]) + /// `system.replicated_partition_exports` is an append-only history; entries never expire. + /// The only way to overwrite an existing entry is `export_merge_tree_partition_force_export`. + if (!query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export]) { - throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED, "Export with key {} already exported or it is being exported, and it has not expired. Set `export_merge_tree_partition_force_export` to overwrite it.", export_key); + throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED, + "Export with key {} already exists (history is append-only). Set `export_merge_tree_partition_force_export` to overwrite it.", + export_key); } LOG_INFO(log, "Overwriting export with key {}", export_key); @@ -8527,8 +8729,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.parts = part_names; manifest.create_time = time(nullptr); manifest.max_retries = query_context->getSettingsRef()[Setting::export_merge_tree_partition_max_retries]; - manifest.ttl_seconds = query_context->getSettingsRef()[Setting::export_merge_tree_partition_manifest_ttl]; manifest.task_timeout_seconds = query_context->getSettingsRef()[Setting::export_merge_tree_partition_task_timeout_seconds]; + manifest.origin = origin; manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 9f905cfaa7db..9e240c906388 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -403,6 +404,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; + friend class ExportPartitionTTLTask; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; @@ -525,7 +527,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData Coordination::WatchCallbackPtr export_merge_tree_partition_watch_callback; - std::mutex export_merge_tree_partition_mutex; + mutable std::mutex export_merge_tree_partition_mutex; BackgroundSchedulePoolTaskHolder export_merge_tree_partition_select_task; @@ -535,6 +537,36 @@ class StorageReplicatedMergeTree final : public MergeTreeData ExportPartitionTaskEntriesContainer::index::type & export_merge_tree_partition_task_entries_by_key; ExportPartitionTaskEntriesContainer::index::type & export_merge_tree_partition_task_entries_by_transaction_id; ExportPartitionTaskEntriesContainer::index::type & export_merge_tree_partition_task_entries_by_create_time; + + /// Compute the "TTL export marker" for a destination on demand by scanning the manifest cache. + /// The marker is the highest partition_id (numerically, per the curated partition expression) + /// of any `origin=TTL` entry, regardless of status. Returns an empty string when no such entry + /// exists. Caller must hold `export_merge_tree_partition_mutex`. + /// + /// "Any status" is intentional: + /// - Inserts older than the marker must be rejected even if the corresponding TTL export was + /// KILLED/FAILED (those entries persist forever in the append-only history; the user must + /// explicitly `force_export` to retry, and the marker stays put until then). + /// - The TTL task uses the marker to skip past already-attempted partitions. + std::string computeTTLExportMarkerLocked(const QualifiedTableName & destination) const; + + /// Resolve the destination of a `TTL EXPORT` rule to a fully-qualified table name. An empty + /// `destination_database` (when the `TO TABLE` clause omits the `db.` qualifier) is resolved to + /// the source table's own database. This must be the single resolution point shared by every + /// consumer (the TTL task that schedules exports, the marker computation, insert blocking and + /// TTL DROP/DELETE deferral) so that the database written into the export manifest always matches + /// the database used for cache lookups. Resolving against the source table's database (rather + /// than a session/global current database) keeps the result stable across restarts and + /// replication metadata reloads, where no current database is reliably set. + QualifiedTableName resolveExportTTLDestination(const TTLDescription & rule) const; + + /// Background task that translates `TTL EXPORT` rules in the metadata into export entries. + /// Created unconditionally (not gated by `allow_experimental_export_merge_tree_partition`). + /// When the experimental flag is off, the task's call to `exportPartitionToTable` throws + /// and is caught/logged inside the task. When the flag is on, it goes through the same + /// pipeline as `ALTER ... EXPORT PARTITION`. + std::shared_ptr export_merge_tree_partition_ttl_task; + BackgroundSchedulePoolTaskHolder export_merge_tree_partition_ttl_schedule; /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; @@ -775,6 +807,10 @@ class StorageReplicatedMergeTree final : public MergeTreeData /// handle status changes for export partition tasks void exportMergeTreePartitionStatusHandlingTask(); + /// Drive `ExportPartitionTTLTask` and reschedule it. Always active; tolerates the + /// experimental flag being off (the task itself catches the gating exception). + void runExportPartitionTTLTask(); + /** Write the selected parts to merge into the log, * Call when merge_selecting_mutex is locked. * Returns false if any part is not in ZK. @@ -968,6 +1004,34 @@ class StorageReplicatedMergeTree final : public MergeTreeData void exportPartitionToTable(const PartitionCommand &, ContextPtr) override; + /// Schedule an export partition entry with an explicit `origin`. Public override above + /// delegates here with `ALTER`. The TTL background task calls this directly with `TTL`. + void exportPartitionToTableWithOrigin( + const PartitionCommand & command, + ContextPtr query_context, + ExportReplicatedMergeTreePartitionOrigin origin); + +public: + /// Returns true when destructive TTL merges (TTL DROP / TTL DELETE / MATERIALIZE TTL drops) + /// are allowed to proceed for the given partition_id. False when there is at least one TTL + /// EXPORT destination that has not yet COMPLETED an export of this partition. Used by the + /// merge selectors to defer destructive TTL until export ownership has transferred. + bool isPartitionAllowedForTTLDropDelete(const String & partition_id) const; + + /// Best-effort schema validation for TTL EXPORT destinations. Throws on hard mismatches + /// (column structure, partition key) when the destination is currently reachable. When the + /// destination does not yet exist or cannot be inspected (e.g. external storage offline), + /// the check is deferred to the runtime path inside `exportPartitionToTable`. + void validateExportTTLDestinations(const StorageInMemoryMetadata & metadata, ContextPtr query_context) const; + + /// Throws `EXPORT_PARTITION_BACKFILL_NOT_ALLOWED` when `partition_id` is at or below the TTL + /// EXPORT marker of any destination. Used by `MergeTreeDataWriter` to reject backfills into + /// partitions whose data ownership has been transferred to a destination storage. No-op when + /// the table has no TTL EXPORT rules or when `allow_inserts_into_exported_partition` is set. + void checkInsertAllowedByExportTTLMarker(const String & partition_id) const; + +private: + /// NOTE: there are no guarantees for concurrent merges. Dropping part can /// be concurrently merged into some covering part and dropPart will do /// nothing. There are some fundamental problems with it. But this is OK diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 9e8faab689d6..0cda133e88ea 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -43,6 +43,9 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio {"parts_count", std::make_shared(), "Number of parts in the export."}, {"parts_to_do", std::make_shared(), "Number of parts pending to be exported."}, {"status", std::make_shared(), "Status of the export."}, + {"origin", std::make_shared(), + "Provenance of the entry: 'ALTER' for entries scheduled by `ALTER TABLE ... EXPORT PARTITION`, " + "'TTL' for entries scheduled by the background `TTL EXPORT` task."}, {"last_exception_per_replica", std::make_shared(last_exception_tuple), "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, {"exception_count", std::make_shared(), @@ -145,6 +148,7 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu res_columns[i++]->insert(info.parts_count); res_columns[i++]->insert(info.parts_to_do); res_columns[i++]->insert(info.status); + res_columns[i++]->insert(info.origin); Array per_replica; per_replica.reserve(info.last_exception_per_replica.size()); diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index a8666374a7f0..4cfe2571fd4d 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -21,6 +21,8 @@ struct ReplicatedPartitionExportInfo size_t parts_to_do; std::vector parts; String status; + /// Provenance: 'ALTER' (user-issued) or 'TTL' (background TTL task). + String origin; /// One entry per replica that has recorded at least one exception for this task. /// Sourced verbatim from the in-memory mirror; no ZooKeeper traffic. std::vector last_exception_per_replica; diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index 1bb3587b28c2..3a67cdf6f8cc 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -21,7 +21,9 @@ #include #include #include +#include #include +#include #include #include @@ -133,9 +135,12 @@ TTLDescription::TTLDescription(const TTLDescription & other) , set_parts(other.set_parts) , aggregate_descriptions(other.aggregate_descriptions) , destination_type(other.destination_type) + , destination_database(other.destination_database) , destination_name(other.destination_name) , if_exists(other.if_exists) , recompression_codec(other.recompression_codec) + , export_interval_kind(other.export_interval_kind) + , export_interval_count(other.export_interval_count) { } @@ -164,8 +169,11 @@ TTLDescription & TTLDescription::operator=(const TTLDescription & other) set_parts = other.set_parts; aggregate_descriptions = other.aggregate_descriptions; destination_type = other.destination_type; + destination_database = other.destination_database; destination_name = other.destination_name; if_exists = other.if_exists; + export_interval_kind = other.export_interval_kind; + export_interval_count = other.export_interval_count; if (other.recompression_codec) recompression_codec = other.recompression_codec->clone(); @@ -223,6 +231,50 @@ TTLDescription TTLDescription::getTTLFromAST( TTLDescription result; const auto * ttl_element = definition_ast->as(); + /// EXPORT TTLs are partition-level and column-less. The "expression" stored on the AST is + /// an interval (e.g. `INTERVAL 30 DAY`), not a date-typed expression over row columns, so it + /// must skip the usual `checkTTLExpression` pipeline that demands a Date-typed result column. + if (ttl_element != nullptr && ttl_element->mode == TTLMode::EXPORT) + { + result.mode = ttl_element->mode; + result.destination_type = ttl_element->destination_type; + result.destination_database = ttl_element->destination_database; + result.destination_name = ttl_element->destination_name; + result.expression_ast = ttl_element->children.front()->clone(); + + checkExpressionDoesntContainSubqueries(*result.expression_ast); + + auto interval_ast = result.expression_ast->clone(); + EvaluateConstantExpressionResult eval_result; + try + { + eval_result = evaluateConstantExpression(interval_ast, context); + } + catch (...) + { + throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, + "EXPORT TTL interval must be a constant interval expression (e.g. `INTERVAL 30 DAY`)"); + } + + const auto * interval_type = typeid_cast(eval_result.second.get()); + if (!interval_type) + throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, + "EXPORT TTL expression must have Interval type, got {}", eval_result.second->getName()); + + const Int64 value = eval_result.first.safeGet(); + if (value <= 0) + throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, + "EXPORT TTL interval must be positive, got {}", value); + + result.export_interval_kind = interval_type->getKind(); + result.export_interval_count = value; + + /// Partition-key compatibility and destination-table validation are deferred to the storage layer + /// (StorageReplicatedMergeTree), which has access to `metadata.partition_key` and can resolve the + /// destination via DatabaseCatalog. + return result; + } + /// First child is expression: `TTL expr TO DISK` if (ttl_element != nullptr) result.expression_ast = ttl_element->children.front()->clone(); @@ -365,6 +417,7 @@ TTLTableDescription::TTLTableDescription(const TTLTableDescription & other) , move_ttl(other.move_ttl) , recompression_ttl(other.recompression_ttl) , group_by_ttl(other.group_by_ttl) + , export_ttl(other.export_ttl) { } @@ -383,6 +436,7 @@ TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription & move_ttl = other.move_ttl; recompression_ttl = other.recompression_ttl; group_by_ttl = other.group_by_ttl; + export_ttl = other.export_ttl; return *this; } @@ -427,6 +481,21 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST( { result.group_by_ttl.emplace_back(std::move(ttl)); } + else if (ttl.mode == TTLMode::EXPORT) + { + /// Disallow duplicate EXPORT rules with the same destination — the marker is per (src, dst). + for (const auto & existing : result.export_ttl) + { + if (existing.destination_database == ttl.destination_database + && existing.destination_name == ttl.destination_name) + { + throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, + "Duplicate EXPORT TTL clause for destination {}.{}", + ttl.destination_database, ttl.destination_name); + } + } + result.export_ttl.emplace_back(std::move(ttl)); + } else { result.move_ttl.emplace_back(std::move(ttl)); diff --git a/src/Storages/TTLDescription.h b/src/Storages/TTLDescription.h index 59566d67f6ad..d89f05fc3fee 100644 --- a/src/Storages/TTLDescription.h +++ b/src/Storages/TTLDescription.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -88,7 +89,10 @@ struct TTLDescription /// For example DISK or VOLUME DataDestinationType destination_type; - /// Name of destination disk or volume + /// Database of destination table, used only by EXPORT TTLs. + String destination_database; + + /// Name of destination disk, volume or table String destination_name; /// If true, do nothing if DISK or VOLUME doesn't exist . @@ -98,6 +102,13 @@ struct TTLDescription /// Codec name which will be used to recompress data ASTPtr recompression_codec; + /// For EXPORT TTLs: parsed interval as (kind, count). + /// e.g. `EXPORT INTERVAL 30 DAY` -> {Day, 30}. + /// The interval is applied to the partition's top boundary at runtime via proper date arithmetic + /// (variable-length kinds like Month / Year are honored exactly). + IntervalKind export_interval_kind{IntervalKind::Kind::Second}; + Int64 export_interval_count = 0; + /// Parse TTL structure from definition. Able to parse both column and table TTLs. static TTLDescription getTTLFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach); @@ -131,6 +142,10 @@ struct TTLTableDescription TTLDescriptions group_by_ttl; + /// Exporting data TTL (to destination tables, one entry per destination). + /// Only valid for Replicated*MergeTree. + TTLDescriptions export_ttl; + TTLTableDescription() = default; TTLTableDescription(const TTLTableDescription & other); TTLTableDescription & operator=(const TTLTableDescription & other); diff --git a/src/Storages/TTLMode.h b/src/Storages/TTLMode.h index bbbdbee400ae..92f994a96b80 100644 --- a/src/Storages/TTLMode.h +++ b/src/Storages/TTLMode.h @@ -10,6 +10,7 @@ enum class TTLMode : uint8_t MOVE, GROUP_BY, RECOMPRESS, + EXPORT, }; } diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 8b49589a3005..aead509ecf06 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -227,6 +227,191 @@ def test_export_partition_all_to_iceberg(cluster): assert count_2021 == 1, f"Expected 1 row for year=2021, got {count_2021}" +def test_ttl_export_partition_to_iceberg(cluster): + """ + Very basic TTL EXPORT happy path: + * ReplicatedMergeTree with `PARTITION BY toYearNumSinceEpoch(d)` and + `TTL EXPORT INTERVAL 1 DAY TO TABLE `. + * The same partition function is used on the Iceberg side; it is the + only year-bucketing transform Iceberg accepts (`getPartitionField` + maps it to the Iceberg "year" transform). + * Rows are inserted into past-year partitions (2020, 2021), whose top + boundary plus 1 day is well in the past, so both partitions are + immediately eligible for the background `ExportPartitionTTLTask`. + * Both partitions must land COMPLETED in + `system.replicated_partition_exports` with `origin = 'TTL'`, and the + data must appear in the Iceberg table. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_ttl_{uid}" + iceberg_table = f"iceberg_ttl_{uid}" + + # Iceberg destination must exist when the TTL EXPORT clause is validated + # at CREATE time, so create the destination first. + make_iceberg_s3(node, iceberg_table, "id Int64, d Date", partition_by="toYearNumSinceEpoch(d)") + + node.query( + f""" + CREATE TABLE {mt_table} (id Int64, d Date) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'r1') + PARTITION BY toYearNumSinceEpoch(d) + ORDER BY tuple() + TTL EXPORT INTERVAL 1 DAY TO TABLE {iceberg_table} + SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1 + """ + ) + + node.query( + f"INSERT INTO {mt_table} VALUES (1, '2020-01-01'), (2, '2020-12-31'), (3, '2021-06-15')" + ) + + # toYearNumSinceEpoch returns (year - 1970), so partition_ids are "50" and "51". + p2020 = "50" + p2021 = "51" + + # The TTL task ticks every 60s by default; give it enough headroom for both partitions. + wait_for_export_status(node, mt_table, iceberg_table, p2020, "COMPLETED", timeout=180) + wait_for_export_status(node, mt_table, iceberg_table, p2021, "COMPLETED", timeout=180) + + origins = node.query( + f""" + SELECT partition_id, origin + FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + ORDER BY partition_id + FORMAT TabSeparated + """ + ).strip() + assert origins == f"{p2020}\tTTL\n{p2021}\tTTL", ( + f"Expected both partitions to be exported via TTL, got:\n{origins}" + ) + + count_2020 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE toYear(d) = 2020").strip()) + count_2021 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE toYear(d) = 2021").strip()) + assert count_2020 == 2, f"Expected 2 rows for year=2020 in Iceberg, got {count_2020}" + assert count_2021 == 1, f"Expected 1 row for year=2021 in Iceberg, got {count_2021}" + + +def test_ttl_export_staggered_expiry(cluster): + """ + Staggered TTL EXPORT across four hourly partitions, each with three data parts. + + `PARTITION BY toRelativeHourNum(dt)` gives one partition per hour (the Iceberg + "hours" transform), and `top_boundary = raw * 3600 + 3599` in UTC epoch seconds. + The TTL task compares `top_boundary + interval` against the current UTC time, so + we derive a per-test `INTERVAL N SECOND` from a single server-`now` snapshot to + place the four partitions' expiry windows precisely: + + * h1 (3h ago), h2 (2h ago): already expired -> exported immediately + * h3 (1h ago): expires ~30s after `now` -> exported once its window elapses + * h4 (current hour): expires ~1h later -> never exported during the test + + The background task exports one partition per (src, dst) per tick and keeps only + one PENDING at a time, so partitions land COMPLETED sequentially as the marker + advances. We verify ordering, `origin = 'TTL'`, the exported row count, and that + h4 is never picked up. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_ttl_stag_{uid}" + iceberg_table = f"iceberg_ttl_stag_{uid}" + + # Iceberg destination must exist when the TTL EXPORT clause is validated at + # CREATE time, so create it first with the same hourly partitioning. + make_iceberg_s3(node, iceberg_table, "id Int64, dt DateTime", + partition_by="toRelativeHourNum(dt)") + + # Single server-now snapshot (UTC epoch); all boundaries are derived from it. + server_now = int(node.query("SELECT toUnixTimestamp(now())").strip()) + hour = 3600 + cur = server_now - (server_now % hour) # start of the current hour + secs_into_hour = server_now - cur + delta = 30 # h3 fires ~30s after now + + # INTERVAL N SECOND chosen so fires(h3) = top_boundary(h3) + N = now + delta. + # top_boundary(h3) = (cur - hour) + 3599 = cur - 1, so N = secs_into_hour + delta + 1. + interval_seconds = secs_into_hour + delta + 1 + + h1_start = cur - 3 * hour + h2_start = cur - 2 * hour + h3_start = cur - 1 * hour + h4_start = cur + + # partition_id == hours since epoch for toRelativeHourNum. + p1 = h1_start // hour + p2 = h2_start // hour + p3 = h3_start // hour + p4 = h4_start // hour + + node.query( + f""" + CREATE TABLE {mt_table} (id Int64, dt DateTime) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'r1') + PARTITION BY toRelativeHourNum(dt) + ORDER BY tuple() + TTL EXPORT INTERVAL {interval_seconds} SECOND TO TABLE {iceberg_table} + SETTINGS enable_block_number_column = 1, + enable_block_offset_column = 1, + allow_inserts_into_exported_partition = 1 + """ + ) + + # Three separate inserts per partition -> three data parts per partition. + # A fixed offset inside each hour keeps the row in the intended bucket. + next_id = 1 + for h_start in (h1_start, h2_start, h3_start, h4_start): + ts = h_start + 100 + for _ in range(3): + node.query(f"INSERT INTO {mt_table} VALUES ({next_id}, toDateTime({ts}))") + next_id += 1 + + # The first three partitions must export in ascending order as the marker advances. + wait_for_export_status(node, mt_table, iceberg_table, str(p1), "COMPLETED", timeout=180) + wait_for_export_status(node, mt_table, iceberg_table, str(p2), "COMPLETED", timeout=180) + wait_for_export_status(node, mt_table, iceberg_table, str(p3), "COMPLETED", timeout=180) + + origins = node.query( + f""" + SELECT partition_id, origin + FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + ORDER BY toUInt64(partition_id) + FORMAT TabSeparated + """ + ).strip() + assert origins == f"{p1}\tTTL\n{p2}\tTTL\n{p3}\tTTL", ( + f"Expected partitions {p1}, {p2}, {p3} exported via TTL in ascending order, got:\n{origins}" + ) + + # h4 is ~1h away from expiring and must not be picked up. Re-check after a short + # delay (longer than the task's idle cadence) to confirm it stays unexported. + for _ in range(2): + p4_status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '{p4}'" + ).strip() + assert p4_status == "", ( + f"Partition {p4} (current hour) must not be exported during the test, got status: {p4_status!r}" + ) + time.sleep(12) + + total = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert total == 9, f"Expected 9 exported rows (3 partitions x 3 rows), got {total}" + + count_h4 = int(node.query( + f"SELECT count() FROM {iceberg_table}" + f" WHERE toRelativeHourNum(dt) = {p4}" + ).strip()) + assert count_h4 == 0, f"Expected 0 rows for the unexpired current-hour partition, got {count_h4}" + + def test_failure_is_logged_in_system_table(cluster): """ When S3 is unreachable the export must be marked FAILED in @@ -888,7 +1073,6 @@ def test_export_task_timeout_kills_stuck_pending_task(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" f" SETTINGS export_merge_tree_partition_task_timeout_seconds = 5," f" export_merge_tree_partition_max_retries = 1000000," - f" export_merge_tree_partition_manifest_ttl = 3600," f" allow_insert_into_iceberg = 1" ) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3161e3b67100..93eb070b3876 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -627,42 +627,6 @@ def test_inject_short_living_failures(cluster): assert int(exception_count.strip()) >= 1, "Expected at least one exception" -def test_export_ttl(cluster): - node = cluster.instances["replica1"] - - postfix = str(uuid.uuid4()).replace("-", "_") - mt_table = f"export_ttl_mt_table_{postfix}" - s3_table = f"export_ttl_s3_table_{postfix}" - - expiration_time = 3 - - create_tables_and_insert_data(node, mt_table, s3_table, "replica1") - - # start export - node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_manifest_ttl={expiration_time};") - - # assert that I get an error when trying to export the same partition again, query_and_get_error - error = node.query_and_get_error(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table};") - assert "Export with key" in error, "Expected error about expired export" - - # wait for the export to finish and for the manifest to expire - wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") - time.sleep(expiration_time * 2) - - # assert that the export succeeded, check the commit file - assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '1\n', "Export did not succeed" - - # start export again - node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}") - - # wait for the export to finish - wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") - - # assert that the export succeeded, check the commit file - # there should be two commit files now, one for the first export and one for the second export - assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '2\n', "Export did not succeed" - - def test_export_partition_file_already_exists_policy(cluster): node = cluster.instances["replica1"] diff --git a/tests/queries/0_stateless/02995_settings_26_1_6_20001_antalya.tsv b/tests/queries/0_stateless/02995_settings_26_1_6_20001_antalya.tsv index db165b84de10..db30dfa9cad6 100644 --- a/tests/queries/0_stateless/02995_settings_26_1_6_20001_antalya.tsv +++ b/tests/queries/0_stateless/02995_settings_26_1_6_20001_antalya.tsv @@ -453,7 +453,6 @@ export_merge_tree_part_throw_on_pending_mutations 1 export_merge_tree_part_throw_on_pending_patch_parts 1 export_merge_tree_partition_force_export 0 export_merge_tree_partition_lock_inside_the_task 0 -export_merge_tree_partition_manifest_ttl 180 export_merge_tree_partition_max_retries 3 export_merge_tree_partition_system_table_prefer_remote_information 1 external_storage_connect_timeout_sec 10