Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ set(ICEBERG_SOURCES
name_mapping.cc
partition_field.cc
partition_spec.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
schema.cc
schema_field.cc
schema_internal.cc
Expand Down Expand Up @@ -98,6 +100,7 @@ iceberg_install_all_headers(iceberg)

add_subdirectory(catalog)
add_subdirectory(expression)
add_subdirectory(row)
add_subdirectory(util)

install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
Expand Down
57 changes: 32 additions & 25 deletions src/iceberg/manifest_reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

#include "manifest_reader_internal.h"

#include <array>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow_c_data_guard_internal.h"
Expand All @@ -29,6 +27,7 @@
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand All @@ -39,7 +38,7 @@ namespace iceberg {
}

#define PARSE_PRIMITIVE_FIELD(item, array_view, type) \
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \
item = static_cast<type>(value); \
Expand All @@ -50,7 +49,7 @@ namespace iceberg {
}

#define PARSE_STRING_FIELD(item, array_view) \
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \
item = std::string(value.data, value.size_bytes); \
Expand All @@ -61,7 +60,7 @@ namespace iceberg {
}

#define PARSE_BINARY_FIELD(item, array_view) \
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \
} else if (required) { \
Expand Down Expand Up @@ -227,66 +226,67 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
auto field_name = field.value().get().name();
bool required = !field.value().get().optional();
auto view_of_column = array_view.children[idx];
switch (idx) {
case 0:
ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, ManifestFileFieldFromIndex(idx));
switch (manifest_file_field) {
case ManifestFileField::kManifestPath:
PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, view_of_column);
break;
case 1:
case ManifestFileField::kManifestLength:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, view_of_column,
int64_t);
break;
case 2:
case ManifestFileField::kPartitionSpecId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, view_of_column,
int32_t);
break;
case 3:
case ManifestFileField::kContent:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
ManifestFile::Content);
break;
case 4:
case ManifestFileField::kSequenceNumber:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column,
int64_t);
break;
case 5:
case ManifestFileField::kMinSequenceNumber:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, view_of_column,
int64_t);
break;
case 6:
case ManifestFileField::kAddedSnapshotId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, view_of_column,
int64_t);
break;
case 7:
case ManifestFileField::kAddedFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, view_of_column,
int32_t);
break;
case 8:
case ManifestFileField::kExistingFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
view_of_column, int32_t);
break;
case 9:
case ManifestFileField::kDeletedFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, view_of_column,
int32_t);
break;
case 10:
case ManifestFileField::kAddedRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, view_of_column,
int64_t);
break;
case 11:
case ManifestFileField::kExistingRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, view_of_column,
int64_t);
break;
case 12:
case ManifestFileField::kDeletedRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, view_of_column,
int64_t);
break;
case 13:
case ManifestFileField::kPartitionFieldSummary:
ICEBERG_RETURN_UNEXPECTED(
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
break;
case 14:
case ManifestFileField::kKeyMetadata:
PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, view_of_column);
break;
case 15:
case ManifestFileField::kFirstRowId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, view_of_column,
int64_t);
break;
Expand All @@ -297,7 +297,7 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
return manifest_files;
}

Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
std::vector<ManifestEntry>& manifest_entries) {
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
Expand Down Expand Up @@ -357,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
view_of_file_field);
break;
case 2:
for (size_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
for (int64_t row_idx = 0; row_idx < view_of_file_field->length; row_idx++) {
if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, row_idx);
std::string_view path_str(value.data, value.size_bytes);
Expand Down Expand Up @@ -512,7 +512,7 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
break;
case 4: {
auto data_file_schema =
dynamic_pointer_cast<StructType>(field.value().get().type());
internal::checked_pointer_cast<StructType>(field.value().get().type());
ICEBERG_RETURN_UNEXPECTED(
ParseDataFile(data_file_schema, view_of_column, manifest_entries));
break;
Expand Down Expand Up @@ -567,4 +567,11 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
return manifest_files;
}

Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
if (index >= 0 && index < static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
return static_cast<ManifestFileField>(index);
}
return InvalidArgument("Invalid manifest file field index: {}", index);
}

} // namespace iceberg
24 changes: 24 additions & 0 deletions src/iceberg/manifest_reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,28 @@ class ManifestListReaderImpl : public ManifestListReader {
std::unique_ptr<Reader> reader_;
};

enum class ManifestFileField : int32_t {
kManifestPath = 0,
kManifestLength = 1,
kPartitionSpecId = 2,
kContent = 3,
kSequenceNumber = 4,
kMinSequenceNumber = 5,
kAddedSnapshotId = 6,
kAddedFilesCount = 7,
kExistingFilesCount = 8,
kDeletedFilesCount = 9,
kAddedRowsCount = 10,
kExistingRowsCount = 11,
kDeletedRowsCount = 12,
kPartitionFieldSummary = 13,
kKeyMetadata = 14,
kFirstRowId = 15,
// kNextUnusedId is the placeholder for the next unused index.
// Always keep this as the last index when adding new fields.
kNextUnusedId = 16,
};

Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index);

} // namespace iceberg
18 changes: 18 additions & 0 deletions src/iceberg/row/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

iceberg_install_all_headers(iceberg/row)
Loading
Loading