From c9b59d859e6ed1bbd4c7776217a811f1b82d1816 Mon Sep 17 00:00:00 2001 From: SkylerLin <44233950+linguoxuan@users.noreply.github.com> Date: Sat, 21 Mar 2026 17:06:43 +0800 Subject: [PATCH] feat: retry failed transaction commit --- src/iceberg/table.cc | 1 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/retry_util_test.cc | 302 ++++++++++++++++++++++++++ src/iceberg/transaction.cc | 78 +++++-- src/iceberg/transaction.h | 6 + src/iceberg/update/pending_update.h | 3 + src/iceberg/update/snapshot_update.cc | 15 +- src/iceberg/util/retry_util.h | 160 ++++++++++++++ 8 files changed, 550 insertions(+), 16 deletions(-) create mode 100644 src/iceberg/test/retry_util_test.cc create mode 100644 src/iceberg/util/retry_util.h diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 2f2753f38..1255871c3 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -87,6 +87,7 @@ Status Table::Refresh() { ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_)); if (metadata_location_ != refreshed_table->metadata_file_location()) { metadata_ = std::move(refreshed_table->metadata_); + metadata_location_ = std::string(refreshed_table->metadata_file_location()); io_ = std::move(refreshed_table->io_); metadata_cache_ = std::make_unique(metadata_.get()); } diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 768e0507e..ef6e57e73 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -115,6 +115,7 @@ add_iceberg_test(util_test endian_test.cc formatter_test.cc location_util_test.cc + retry_util_test.cc string_util_test.cc transform_util_test.cc truncate_util_test.cc diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc new file mode 100644 index 000000000..cef533bdd --- /dev/null +++ b/src/iceberg/test/retry_util_test.cc @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/retry_util.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// -------------------------------------------------------------------------- +// Test: Successful on first attempt — no retries +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, SuccessOnFirstAttempt) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 3, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: Retry once then succeed +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, RetryOnceThenSucceed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 3, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("transient failure"); + } + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); +} + +// -------------------------------------------------------------------------- +// Test: Max attempts exhausted +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MaxAttemptsExhausted) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 2, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("always fails"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 3); // 1 initial + 2 retries + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn filters correctly +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnFilter) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 3, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + // Return a non-retryable error + return ValidationFailed("schema conflict"); + }, + &attempts); + + // Should NOT retry because ValidationFailed is not in the retry list + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn retries matching error +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 2, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 100; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 100); + EXPECT_EQ(call_count, 3); // 2 failures + 1 success + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: StopRetryOn stops on matching error +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, StopRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 5, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .StopRetryOn({ErrorKind::kCommitStateUnknown}) + .Run( + [&]() -> Result { + ++call_count; + return CommitStateUnknown("datacenter on fire"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: Zero retries means only one attempt +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, ZeroRetries) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 0, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: MakeCommitRetryRunner has correct configuration +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) { + int call_count = 0; + int32_t attempts = 0; + + // MakeCommitRetryRunner should only retry on kCommitFailed + auto result = MakeCommitRetryRunner(2, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + // ValidationFailed should not be retried + return ValidationFailed("not retryable"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: MakeCommitRetryRunner retries CommitFailed +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = MakeCommitRetryRunner(3, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 99; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 99); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn with multiple error kinds +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 5, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("conflict"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return 77; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 77); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: Default retry (no filter) retries all errors +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, DefaultRetryAllErrors) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 3, .min_wait_ms = 1, .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return IOError("disk full"); + } + if (call_count == 2) { + return ValidationFailed("bad schema"); + } + return 55; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 55); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index bf4ac426b..b181fe46a 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -50,6 +50,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/retry_util.h" namespace iceberg { @@ -346,23 +347,33 @@ Result> Transaction::Commit() { return ctx_->table; } - std::vector> requirements; - switch (ctx_->kind) { - case TransactionKind::kCreate: { - ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); - } break; - case TransactionKind::kUpdate: { - ICEBERG_ASSIGN_OR_RAISE( - requirements, - TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates)); - - } break; + Result> commit_result; + if (!CanRetry()) { + std::vector> requirements; + switch (ctx_->kind) { + case TransactionKind::kCreate: { + ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); + } break; + case TransactionKind::kUpdate: { + ICEBERG_ASSIGN_OR_RAISE( + requirements, + TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates)); + } break; + } + commit_result = + ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates); + } else { + const auto& props = ctx_->table->properties(); + int32_t num_retries = props.Get(TableProperties::kCommitNumRetries); + int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs); + int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs); + int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs); + + commit_result = + MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms) + .Run([this]() -> Result> { return CommitOnce(); }); } - // XXX: we should handle commit failure and retry here. - auto commit_result = - ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates); - for (const auto& update : pending_updates_) { std::ignore = update->Finalize(commit_result.has_value() ? std::nullopt @@ -378,6 +389,43 @@ Result> Transaction::Commit() { return ctx_->table; } +Result> Transaction::CommitOnce() { + auto refresh_result = ctx_->table->Refresh(); + if (!refresh_result.has_value()) { + return std::unexpected(refresh_result.error()); + } + + if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) { + ctx_->metadata_builder = + TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get()); + for (const auto& update : pending_updates_) { + auto commit_status = update->Commit(); + if (!commit_status.has_value()) { + return std::unexpected(commit_status.error()); + } + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable( + *ctx_->metadata_builder->base(), + ctx_->metadata_builder->changes())); + + return ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, + ctx_->metadata_builder->changes()); +} + +bool Transaction::CanRetry() const { + if (ctx_->kind == TransactionKind::kCreate) { + return false; + } + for (const auto& update : pending_updates_) { + if (!update->IsRetryable()) { + return false; + } + } + return true; +} + Result> Transaction::NewUpdatePartitionSpec() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, UpdatePartitionSpec::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ec8c4db0a..19637b5df 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -138,6 +138,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> CommitOnce(); + + /// \brief Whether this transaction can retry after a commit conflict. + bool CanRetry() const; + private: friend class PendingUpdate; diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index f67be18cd..6d7994056 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -58,6 +58,9 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { /// \brief Return the kind of this pending update. virtual Kind kind() const = 0; + /// \brief Whether this update can be retried after a commit conflict. + virtual bool IsRetryable() const { return true; } + /// \brief Apply the pending changes and commit. /// /// \return An OK status if the commit was successful, or an error: diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 3e5792667..fc8ddd58a 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -218,7 +218,8 @@ Result> SnapshotUpdate::WriteDeleteManifests( } int64_t SnapshotUpdate::SnapshotId() { - if (!snapshot_id_.has_value()) { + while (!snapshot_id_.has_value() || + base().SnapshotById(snapshot_id_.value()).has_value()) { snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); } return snapshot_id_.value(); @@ -226,6 +227,18 @@ int64_t SnapshotUpdate::SnapshotId() { Result SnapshotUpdate::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + if (staged_snapshot_ != nullptr) { + for (const auto& manifest_list : manifest_lists_) { + std::ignore = DeleteFile(manifest_list); + } + manifest_lists_.clear(); + CleanUncommitted(std::unordered_set{}); + + staged_snapshot_ = nullptr; + summary_.Clear(); + } + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot, SnapshotUtil::OptionalLatestSnapshot(base(), target_branch_)); diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h new file mode 100644 index 000000000..2e5c36c1e --- /dev/null +++ b/src/iceberg/util/retry_util.h @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Configuration for retry behavior +struct RetryConfig { + /// Maximum number of retry attempts (not including the first attempt) + int32_t num_retries = 4; + /// Minimum wait time between retries in milliseconds + int32_t min_wait_ms = 100; + /// Maximum wait time between retries in milliseconds + int32_t max_wait_ms = 60 * 1000; // 1 minute + /// Total maximum time for all retries in milliseconds + int32_t total_timeout_ms = 30 * 60 * 1000; // 30 minutes + /// Exponential backoff scale factor + double scale_factor = 2.0; +}; + +/// \brief Utility class for running tasks with retry logic +class RetryRunner { + public: + /// \brief Construct a RetryRunner with the given configuration + explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {} + + /// \brief Specify error types that should trigger a retry. + RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { + only_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Specify a single error type that should trigger a retry + RetryRunner& OnlyRetryOn(ErrorKind error_kind) { + only_retry_on_ = std::vector{error_kind}; + return *this; + } + + /// \brief Specify error types that should stop retries immediately + RetryRunner& StopRetryOn(std::initializer_list error_kinds) { + stop_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Run a task that returns a Result + template ::value_type> + Result Run(F&& task, int32_t* attempt_counter = nullptr) { + auto start_time = std::chrono::steady_clock::now(); + int32_t attempt = 0; + int32_t max_attempts = config_.num_retries + 1; + + while (true) { + ++attempt; + if (attempt_counter != nullptr) { + *attempt_counter = attempt; + } + + auto result = task(); + if (result.has_value()) { + return result; + } + + const auto& error = result.error(); + + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time) + .count(); + + // total_timeout_ms <= 0 means no total timeout limit + bool timed_out = config_.total_timeout_ms > 0 && elapsed > config_.total_timeout_ms; + if (attempt >= max_attempts || timed_out) { + return result; + } + + if (!ShouldRetry(error.kind)) { + return result; + } + + int32_t delay_ms = CalculateDelay(attempt); + Sleep(delay_ms); + } + } + + private: + /// \brief Check if the given error kind should trigger a retry. + bool ShouldRetry(ErrorKind kind) const { + if (!only_retry_on_.empty()) { + return std::ranges::any_of(only_retry_on_, + [kind](ErrorKind k) { return kind == k; }); + } + + if (!stop_retry_on_.empty()) { + return !std::ranges::any_of(stop_retry_on_, + [kind](ErrorKind k) { return kind == k; }); + } + + return true; + } + + /// \brief Calculate delay with exponential backoff and jitter + int32_t CalculateDelay(int32_t attempt) const { + // Calculate base delay with exponential backoff + double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); + int32_t delay_ms = static_cast( + std::min(base_delay, static_cast(config_.max_wait_ms))); + + static thread_local std::mt19937 gen(std::random_device{}()); + int32_t jitter_range = std::max(1, delay_ms / 10); + std::uniform_int_distribution<> dis(0, jitter_range - 1); + delay_ms += dis(gen); + return std::max(1, delay_ms); + } + + /// \brief Sleep for the specified duration + void Sleep(int32_t ms) const { + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + + RetryConfig config_; + std::vector only_retry_on_; + std::vector stop_retry_on_; +}; + +/// \brief Helper function to create a RetryRunner with table commit configuration +inline RetryRunner MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms, + int32_t max_wait_ms, int32_t total_timeout_ms) { + return RetryRunner(RetryConfig{.num_retries = num_retries, + .min_wait_ms = min_wait_ms, + .max_wait_ms = max_wait_ms, + .total_timeout_ms = total_timeout_ms}) + .OnlyRetryOn(ErrorKind::kCommitFailed); +} + +} // namespace iceberg