Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions antalya/docs/design/alter-table-export-part-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,11 @@ The following notes expand on expected behavior of commands.
every active part of partition `p` across all replicas that host
it; `system.replicated_partition_exports` converges to `COMPLETED`.

4. Re-issuing the same `EXPORT PARTITION` within
`export_merge_tree_partition_manifest_ttl` is a no-op (no
duplicate files) unless `export_merge_tree_partition_force_export = 1`. This
behavior avoids accidentally exporting the same data twice. Note, however
that forcing the operation is dangerous if ClickHouse can't clean up the
previous operation. In this case you'll potentially commit files twice.
4. `system.replicated_partition_exports` is an append-only history.
Re-issuing `EXPORT PARTITION` for a key that has any entry (in any
terminal state) is rejected unless `export_merge_tree_partition_force_export = 1`.
Forcing the operation is dangerous if ClickHouse can't clean up the
previous operation — you'll potentially commit files twice.

5. Killing an in-flight partition export via `KILL EXPORT PARTITION`
transitions status to `KILLED` and stops all replicas' contributions.
Expand Down Expand Up @@ -481,7 +480,6 @@ The following notes expand on expected behavior of commands.
| `export_merge_tree_part_filename_pattern` | query | `{part_name}_{checksum}` | `String` | both | Filename template; supports `{part_name}`, `{checksum}`, `{database}`, `{table}`, server macros. |
| `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. Dangerous — can produce duplicate data on the destination; use with caution. |
| `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Retry budget applied to both per-part export attempts and per-task commit attempts (Iceberg). The task fails terminally if commit retries alone exceed the budget. |
| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. Keep this greater than `export_merge_tree_partition_task_timeout_seconds` if you want the `KILLED` entry to remain visible in `system.replicated_partition_exports` after the timeout fires. |
| `export_merge_tree_partition_task_timeout_seconds` | query | `3600` (seconds) | `UInt64` (`0`=disable) | `EXPORT PARTITION` | Wall-clock cap for `PENDING` tasks; on expiry transitions to `KILLED` with a timeout reason. Measured from manifest `create_time`. Enforcement latency ≈ one manifest-updater poll cycle (~30s) plus Keeper watch propagation. |
| `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. (See NOTE 2.)|
| `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. |
Expand Down
72 changes: 64 additions & 8 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,14 @@ TO TABLE [destination_database.]destination_table

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution.
- **Description**: Overwrite an existing entry in `system.replicated_partition_exports` for the same `(source, destination, partition_id)`. This is required because the system table is an append-only history; the only way to re-export the same partition to the same destination is to set this flag. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution.

#### `export_merge_tree_partition_max_retries` (Optional)

- **Type**: `UInt64`
- **Default**: `3`
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.

#### `export_merge_tree_partition_manifest_ttl` (Optional)

- **Type**: `UInt64`
- **Default**: `180` (seconds)
- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones.

#### `export_merge_tree_part_file_already_exists_policy` (Optional)

- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy`
Expand Down Expand Up @@ -109,7 +103,7 @@ When the timeout is exceeded the task transitions to KILLED (same terminal state

Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires.
- `system.replicated_partition_exports` is an append-only history: terminal entries (`COMPLETED` / `FAILED` / `KILLED`) are never automatically removed.

## Examples

Expand Down Expand Up @@ -205,6 +199,68 @@ Status values include:
- `FAILED` - Export failed
- `KILLED` - Export was cancelled

## TTL EXPORT

Replicated*MergeTree tables can drive `EXPORT PARTITION` automatically through
the `TTL EXPORT` clause. Once a partition's *top boundary* plus the configured
interval lies in the past, a background task on the table schedules an export
to the destination just as if the operator had run `ALTER TABLE ... EXPORT
PARTITION`.

### Syntax

```sql
CREATE TABLE rmt_table (id UInt64, d Date)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'r1')
PARTITION BY toYearNumSinceEpoch(d)
ORDER BY tuple()
TTL EXPORT INTERVAL 30 DAY TO TABLE iceberg_table;
```

Multiple `TTL EXPORT` clauses are allowed on the same table; each rule
maintains its own progress against its destination.

### Eligibility

A partition is eligible when

```
top_boundary(partition_id) + INTERVAL <= now()
```

`top_boundary` is the *inclusive supremum* of the time range that the
partition can hold. For example, a partition keyed by `toYear(d) = 2020`
has a top boundary of `2020-12-31 23:59:59`.

### Supported `PARTITION BY` expressions

`TTL EXPORT` requires a closed-form inverse for the partition function. The
curated whitelist is:

| Family | Expressions |
|-----------------------|----------------------------------------------------------------------|
| Identity | `Date`, `Date32`, `DateTime`, `DateTime64` |
| Generic `to*` | `toYear`, `toYYYYMM`, `toYYYYMMDD`, `toMonday`, `toStartOf{Year,Quarter,Month,Week,Day,Hour,Minute}` |
| Iceberg-spec transforms | `toYearNumSinceEpoch`, `toMonthNumSinceEpoch`, `toRelativeDayNum`, `toRelativeHourNum` |

**Pairing with Iceberg destinations:** Apache Iceberg only accepts the four
*Iceberg-spec transforms* above as time-bucketing partition functions
(mapping to the Iceberg `year`, `month`, `days`, `hours` transforms). When
exporting into an Iceberg table both the source `PARTITION BY` and the
destination `PARTITION BY` must use the same one of those functions. The
generic `to*` family is appropriate for non-Iceberg destinations
(e.g. Hive-partitioned object storage).

`icebergTruncate` / `icebergBucket` are not temporal and are not accepted by
`TTL EXPORT`; bare integer columns are also rejected because they carry no
time semantics.

### Origin column

Entries scheduled by the background task carry `origin = 'TTL'` in
`system.replicated_partition_exports`; entries from manual `ALTER ... EXPORT
PARTITION` carry `origin = 'ALTER'`.

## Related Features

- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@
M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \
M(1006, EXPORT_PARTITION_ALREADY_EXPORTED) \
M(1007, PARTITION_EXPORT_FAILED) \
M(1008, EXPORT_PARTITION_BACKFILL_NOT_ALLOWED) \
/* See END */

#ifdef APPLY_FOR_EXTERNAL_ERROR_CODES
Expand All @@ -683,7 +684,7 @@ namespace ErrorCodes
APPLY_FOR_ERROR_CODES(M)
#undef M

constexpr ErrorCode END = 1007;
constexpr ErrorCode END = 1008;
ErrorPairHolder values[END + 1]{};

struct ErrorCodesNames
Expand Down
5 changes: 0 additions & 5 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7534,10 +7534,6 @@ Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 86400, R"(
Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination.
This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_task_timeout_seconds, 3600, R"(
Maximum wall-clock duration (in seconds) an export partition task is allowed to remain in the PENDING state before it is auto-killed by the background cleanup loop.
Expand All @@ -7546,7 +7542,6 @@ When the timeout is exceeded the task transitions to KILLED (same terminal state

Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires.
)", 0) \
DECLARE(MergeTreePartExportFileAlreadyExistsPolicy, export_merge_tree_part_file_already_exists_policy, MergeTreePartExportFileAlreadyExistsPolicy::skip, R"(
Possible values:
Expand Down
5 changes: 3 additions & 2 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_remote_initiator_cluster", "", "", "New setting."},
// {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"},
{"export_merge_tree_partition_task_timeout_seconds", 0, 3600, "New setting to control the timeout for export partition tasks."},
{"export_merge_tree_partition_manifest_ttl", 180, 86400, "Reasonable default for real usage"},
/// `export_merge_tree_partition_manifest_ttl` was removed: `system.replicated_partition_exports` is
/// now an append-only history table (entries never expire). See TTL EXPORT support.
});
addSettingsChanges(settings_changes_history, "26.1",
{
Expand Down Expand Up @@ -324,7 +325,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"export_merge_tree_partition_force_export", false, false, "New setting."},
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
/// `export_merge_tree_partition_manifest_ttl` removed in later versions (entries never expire).
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
Expand Down
15 changes: 15 additions & 0 deletions src/Parsers/ASTTTLElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ ASTPtr ASTTTLElement::clone() const

void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
if (mode == TTLMode::EXPORT)
{
ostr << "EXPORT ";
auto ttl_expr = ttl();
auto nested_frame = frame;
if (auto * ast_alias = dynamic_cast<ASTWithAlias *>(ttl_expr.get()); ast_alias && !ast_alias->tryGetAlias().empty())
nested_frame.need_parens = true;
ttl_expr->format(ostr, settings, state, nested_frame);
ostr << " TO TABLE ";
if (!destination_database.empty())
ostr << backQuoteIfNeed(destination_database) << ".";
ostr << backQuoteIfNeed(destination_name);
return;
}

auto ttl_expr = ttl();
auto nested_frame = frame;
if (auto * ast_alias = dynamic_cast<ASTWithAlias *>(ttl_expr.get()); ast_alias && !ast_alias->tryGetAlias().empty())
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTTTLElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ASTTTLElement : public IAST
public:
TTLMode mode;
DataDestinationType destination_type;
String destination_database;
String destination_name;
bool if_exists = false;

Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT, "EXPORT") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(MOVE, "MOVE") \
Expand Down
27 changes: 27 additions & 0 deletions src/Parsers/ExpressionElementParsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>

#include <Interpreters/StorageID.h>

Expand Down Expand Up @@ -2442,13 +2443,15 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_to_disk(Keyword::TO_DISK);
ParserKeyword s_to_volume(Keyword::TO_VOLUME);
ParserKeyword s_to_table(Keyword::TO_TABLE);
ParserKeyword s_if_exists(Keyword::IF_EXISTS);
ParserKeyword s_delete(Keyword::DELETE);
ParserKeyword s_where(Keyword::WHERE);
ParserKeyword s_group_by(Keyword::GROUP_BY);
ParserKeyword s_set(Keyword::SET);
ParserKeyword s_recompress(Keyword::RECOMPRESS);
ParserKeyword s_codec(Keyword::CODEC);
ParserKeyword s_export(Keyword::EXPORT);
ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL);
ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL);
ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL);
Expand All @@ -2470,6 +2473,30 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| s_modify_ttl.checkWithoutMoving(pos, expected))
return false;

/// EXPORT branch: `EXPORT <interval_expr> TO TABLE [db.]name`.
/// Has no preceding column-based expression; the interval is the only argument.
if (s_export.ignore(pos, expected))
{
ASTPtr interval_expr;
if (!parser_exp.parse(pos, interval_expr, expected))
return false;

if (!s_to_table.ignore(pos, expected))
return false;

String dest_database;
String dest_table;
if (!parseDatabaseAndTableName(pos, expected, dest_database, dest_table))
return false;

auto ttl_element = make_intrusive<ASTTTLElement>(TTLMode::EXPORT, DataDestinationType::TABLE, /*destination_name=*/ String{}, /*if_exists=*/ false);
ttl_element->destination_database = std::move(dest_database);
ttl_element->destination_name = std::move(dest_table);
ttl_element->setTTL(std::move(interval_expr));
node = ttl_element;
return true;
}

ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
return false;
Expand Down
22 changes: 19 additions & 3 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
}
};

/// Provenance of an export partition manifest.
/// - ALTER: scheduled by `ALTER TABLE ... EXPORT PARTITION ID ...`.
/// - TTL: scheduled by the background TTL EXPORT task.
enum class ExportReplicatedMergeTreePartitionOrigin : uint8_t
{
ALTER,
TTL,
};

struct ExportReplicatedMergeTreePartitionManifest
{
String transaction_id;
Expand All @@ -164,7 +173,6 @@ struct ExportReplicatedMergeTreePartitionManifest
std::vector<String> parts;
time_t create_time;
size_t max_retries;
size_t ttl_seconds;
size_t task_timeout_seconds;
size_t max_threads;
bool parallel_formatting;
Expand All @@ -175,6 +183,7 @@ struct ExportReplicatedMergeTreePartitionManifest
String filename_pattern;
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;
ExportReplicatedMergeTreePartitionOrigin origin = ExportReplicatedMergeTreePartitionOrigin::ALTER;

std::string toJsonString() const
{
Expand Down Expand Up @@ -205,9 +214,9 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("origin", String(magic_enum::enum_name(origin)));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -240,7 +249,6 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));

manifest.create_time = json->getValue<time_t>("create_time");
manifest.ttl_seconds = json->getValue<size_t>("ttl_seconds");
manifest.task_timeout_seconds = json->getValue<size_t>("task_timeout_seconds");
manifest.max_threads = json->getValue<size_t>("max_threads");
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
Expand All @@ -262,6 +270,14 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

if (json->has("origin"))
{
auto parsed = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionOrigin>(json->getValue<String>("origin"));
if (parsed.has_value())
manifest.origin = parsed.value();
/// Older nodes have no `origin` field; default ALTER is the right legacy behavior.
}

return manifest;
}
};
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
{
return manifest.create_time;
}

/// Provenance of this entry: ALTER (user) vs TTL (background task).
ExportReplicatedMergeTreePartitionOrigin getOrigin() const
{
return manifest.origin;
}
};

struct ExportPartitionTaskEntryTagByCompositeKey {};
Expand Down
Loading