Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/iceberg/manifest/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,12 @@ ICEBERG_EXPORT inline constexpr Result<ManifestContent> ManifestContentFromStrin
}

} // namespace iceberg

namespace std {
template <>
struct hash<iceberg::ManifestFile> {
size_t operator()(const iceberg::ManifestFile& manifest_file) const {
return std::hash<std::string>{}(manifest_file.manifest_path);
}
};
} // namespace std
214 changes: 201 additions & 13 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,90 @@ Status TableScanContext::Validate() const {
return {};
}

bool IsScanCurrentLineage(const TableScanContext& context) {
return !context.from_snapshot_id.has_value() && !context.to_snapshot_id.has_value();
}

Result<int64_t> ToSnapshotIdInclusive(const TableScanContext& context,
const TableMetadata& metadata) {
// Get the branch's current snapshot ID if branch is set
std::shared_ptr<Snapshot> branch_snapshot;
const std::string& branch = context.branch;
if (!branch.empty()) {
auto iter = metadata.refs.find(branch);
ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
"Cannot find branch: {}", branch);
ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
metadata.SnapshotById(iter->second->snapshot_id));
}

if (context.to_snapshot_id.has_value()) {
int64_t to_snapshot_id_value = context.to_snapshot_id.value();

if (branch_snapshot != nullptr) {
// Validate `to_snapshot_id` is on the current branch
ICEBERG_ASSIGN_OR_RAISE(
bool is_ancestor,
SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
to_snapshot_id_value));
ICEBERG_CHECK(is_ancestor,
"End snapshot is not a valid snapshot on the current branch: {}",
branch);
}

return to_snapshot_id_value;
}

// If to_snapshot_id is not set, use branch's current snapshot if branch is set
if (branch_snapshot != nullptr) {
return branch_snapshot->snapshot_id;
}

// Get current snapshot from table's current snapshot
std::shared_ptr<Snapshot> current_snapshot;
ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
ICEBERG_CHECK(current_snapshot != nullptr,
"End snapshot is not set and table has no current snapshot");
return current_snapshot->snapshot_id;
}

Result<std::optional<int64_t>> FromSnapshotIdExclusive(const TableScanContext& context,
const TableMetadata& metadata,
int64_t to_snapshot_id_inclusive) {
if (!context.from_snapshot_id.has_value()) {
return std::nullopt;
}

int64_t from_snapshot_id = context.from_snapshot_id.value();

// Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
if (context.from_snapshot_id_inclusive) {
ICEBERG_ASSIGN_OR_RAISE(
bool is_ancestor,
SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive, from_snapshot_id));
ICEBERG_CHECK(
is_ancestor,
"Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}",
from_snapshot_id, to_snapshot_id_inclusive);

// For inclusive behavior, return the parent snapshot ID (can be nullopt)
ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot, metadata.SnapshotById(from_snapshot_id));
return from_snapshot->parent_snapshot_id;
}

// Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is
// `from_snapshot_id`
ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor,
SnapshotUtil::IsParentAncestorOf(
metadata, to_snapshot_id_inclusive, from_snapshot_id));
ICEBERG_CHECK(
is_parent_ancestor,
"Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}",
from_snapshot_id, to_snapshot_id_inclusive);

return from_snapshot_id;
}

} // namespace internal

ScanTask::~ScanTask() = default;
Expand Down Expand Up @@ -340,10 +424,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
int64_t from_snapshot_id, bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
if (inclusive) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(from_snapshot_id));
}
this->context_.from_snapshot_id = from_snapshot_id;
this->context_.from_snapshot_id_inclusive = inclusive;
return *this;
}

Expand All @@ -352,31 +441,45 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
const std::string& ref, bool inclusive)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
auto iter = metadata_->refs.find(ref);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
"Ref {} is not a tag", ref);
return FromSnapshot(iter->second->snapshot_id, inclusive);
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id));
context_.to_snapshot_id = to_snapshot_id;
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
requires IsIncrementalScan<ScanType>
{
AddError(NotImplemented("Incremental scan is not implemented"));
return *this;
auto iter = metadata_->refs.find(ref);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
"Ref {} is not a tag", ref);
return ToSnapshot(iter->second->snapshot_id);
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
const std::string& branch)
requires IsIncrementalScan<ScanType>
{
auto iter = metadata_->refs.find(branch);
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", branch);
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
"Ref {} is not a branch", branch);
context_.branch = branch;
return *this;
}
Expand Down Expand Up @@ -539,17 +642,77 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
// IncrementalAppendScan implementation

Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalAppendScan is not implemented");
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}

Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
ICEBERG_ASSIGN_OR_RAISE(
auto ancestors_snapshots,
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
from_snapshot_id_exclusive));

std::vector<std::shared_ptr<Snapshot>> append_snapshots;
std::ranges::copy_if(ancestors_snapshots, std::back_inserter(append_snapshots),
[](const auto& snapshot) {
return snapshot != nullptr &&
snapshot->Operation().has_value() &&
snapshot->Operation().value() == DataOperation::kAppend;
});
if (append_snapshots.empty()) {
return std::vector<std::shared_ptr<FileScanTask>>{};
}

std::unordered_set<int64_t> snapshot_ids;
std::ranges::transform(append_snapshots,
std::inserter(snapshot_ids, snapshot_ids.end()),
[](const auto& snapshot) { return snapshot->snapshot_id; });

std::unordered_set<ManifestFile> data_manifests;
for (const auto& snapshot : append_snapshots) {
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
std::ranges::copy_if(manifests, std::inserter(data_manifests, data_manifests.end()),
[&snapshot_ids](const ManifestFile& manifest) {
return snapshot_ids.contains(manifest.added_snapshot_id);
});
}
if (data_manifests.empty()) {
return std::vector<std::shared_ptr<FileScanTask>>{};
}

TableMetadataCache metadata_cache(metadata_.get());
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());

ICEBERG_ASSIGN_OR_RAISE(
auto manifest_group,
ManifestGroup::Make(
io_, schema_, specs_by_id,
std::vector<ManifestFile>(data_manifests.begin(), data_manifests.end()), {}));

manifest_group->CaseSensitive(context_.case_sensitive)
.Select(ScanColumns())
.FilterData(filter())
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
return entry.snapshot_id.has_value() &&
snapshot_ids.contains(entry.snapshot_id.value()) &&
entry.status == ManifestStatus::kAdded;
})
.IgnoreDeleted()
.ColumnsToKeepStats(context_.columns_to_keep_stats);

if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}

return manifest_group->PlanFiles();
}

// IncrementalChangelogScan implementation
Expand All @@ -568,4 +731,29 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
}

// Explicit template implementations for IncrementalScan
// This moves the template implementation from header to source file
template <typename ScanTaskType>
Result<std::vector<std::shared_ptr<ScanTaskType>>>
IncrementalScan<ScanTaskType>::PlanFiles() const {
if (IsScanCurrentLineage(context_)) {
ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, metadata_->Snapshot());
if (current_snapshot == nullptr) {
return std::vector<std::shared_ptr<ScanTaskType>>{};
}
}

ICEBERG_ASSIGN_OR_RAISE(int64_t to_snapshot_id_inclusive,
internal::ToSnapshotIdInclusive(context_, *metadata_));
ICEBERG_ASSIGN_OR_RAISE(
std::optional<int64_t> from_snapshot_id_exclusive,
internal::FromSnapshotIdExclusive(context_, *metadata_, to_snapshot_id_inclusive));

return PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
}

// Explicitly instantiate the templates
template class IncrementalScan<FileScanTask>;
template class IncrementalScan<ChangelogScanTask>;

} // namespace iceberg
14 changes: 11 additions & 3 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "iceberg/arrow_c_data.h"
#include "iceberg/result.h"
#include "iceberg/table_metadata.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"

Expand Down Expand Up @@ -361,9 +362,7 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {

/// \brief Plans the scan tasks by resolving manifests and data files.
/// \return A Result containing scan tasks or an error.
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
}
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const;

protected:
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
Expand All @@ -373,6 +372,9 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
using TableScan::TableScan;
};

extern template class IncrementalScan<FileScanTask>;
extern template class IncrementalScan<ChangelogScanTask>;

/// \brief A scan that reads data files added between snapshots (incremental appends).
class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask> {
public:
Expand All @@ -383,6 +385,9 @@ class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask

~IncrementalAppendScan() override = default;

// Bring the public PlanFiles() from base class into scope
using IncrementalScan<FileScanTask>::PlanFiles;

protected:
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
Expand All @@ -402,6 +407,9 @@ class ICEBERG_EXPORT IncrementalChangelogScan

~IncrementalChangelogScan() override = default;

// Bring the public PlanFiles() from base class into scope
using IncrementalScan<ChangelogScanTask>::PlanFiles;

protected:
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
file_scan_task_test.cc
incremental_append_scan_test.cc
table_scan_test.cc)

add_iceberg_test(table_update_test
Expand Down
Loading
Loading