Skip to content

Commit

Permalink
ignore void transform partitions from current partition field list (#606
Browse files Browse the repository at this point in the history
)

* ignore void transform partitions from current partition field list

* use correct method calls to get partition schema

* test tableDTO instead of tableDto

* Add safe-guard to omit void partitions

* revert fast property changes

* Delete omitVoidPartitionFields from Config interface and impl

* revert last changes

* use equalsIgnoreCase and remove extra lines

---------

Co-authored-by: Gabriel Igliozzi <[email protected]>
  • Loading branch information
gabeiglio and Gabriel Igliozzi authored Sep 26, 2024
1 parent 3dd4eb4 commit dc3bc13
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name,
final TableInfo tableInfo) {
final org.apache.iceberg.Table table = tableWrapper.getTable();
final List<FieldInfo> allFields =
this.hiveTypeConverter.icebergeSchemaTofieldDtos(table.schema(), table.spec().fields());
this.hiveTypeConverter.icebergSchemaTofieldDtos(table.schema(), table.spec().fields());
final Map<String, String> tableParameters = new HashMap<>();
tableParameters.put(DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE);
tableParameters.put(DirectSqlTable.PARAM_METADATA_LOCATION, tableLoc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ public Type toMetacatType(final String type) {
* @param partitionFields partitioned fields
* @return list of field Info
*/
public List<FieldInfo> icebergeSchemaTofieldDtos(final Schema schema,
public List<FieldInfo> icebergSchemaTofieldDtos(final Schema schema,
final List<PartitionField> partitionFields) {
final List<FieldInfo> fields = Lists.newArrayList();
final List<String> partitionNames =
partitionFields.stream()
.map(f -> schema.findField(f.sourceId()).name()).collect(Collectors.toList());
final List<String> partitionNames = partitionFields.stream()
.filter(f -> f.transform() != null && !f.transform().toString().equalsIgnoreCase("void"))
.map(f -> schema.findField(f.sourceId()).name())
.collect(Collectors.toList());

for (Types.NestedField field : schema.columns()) {
final FieldInfo fieldInfo = new FieldInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.netflix.metacat.connector.hive.converters
import org.apache.iceberg.PartitionField
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.Schema
import org.apache.iceberg.transforms.Identity
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Types
import com.netflix.metacat.common.QualifiedName
Expand Down Expand Up @@ -58,7 +59,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{
def setup() {
// Stub this to always return true
config.isEpochInSeconds() >> true
converter = new HiveConnectorInfoConverter( new HiveTypeConverter())
converter = new HiveConnectorInfoConverter(new HiveTypeConverter())
}

def 'test date to epoch seconds'() {
Expand Down Expand Up @@ -512,6 +513,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{
def tableInfo = converter.fromIcebergTableToTableInfo(QualifiedName.ofTable('c', 'd', 't'),
icebergTableWrapper, "/tmp/test", TableInfo.builder().build() )
then:
2 * field.transform() >> Mock(Identity)
1 * icebergTable.properties() >> ["test":"abd"]
2 * icebergTable.spec() >> partSpec
1 * partSpec.fields() >> [ field]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

package com.netflix.metacat.connector.hive.converters

import org.apache.iceberg.PartitionField
import org.apache.iceberg.transforms.Identity
import org.apache.iceberg.transforms.VoidTransform
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
Expand Down Expand Up @@ -255,4 +260,43 @@ class HiveTypeConverterSpec extends Specification {
"array<struct<date:string,countryCodes:array<string>,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}"""
"array<struct<Date:string,nestedArray:array<struct<date:string,countryCodes:array<string>,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}"""
}

def "Test treat void transforms partitions as non-partition field"() {
given:
// Initial schema with three fields
def initialSchema = new Schema(
Types.NestedField.optional(1, "field1", Types.BooleanType.get(), "added 1st - partition key"),
Types.NestedField.optional(2, "field2", Types.StringType.get(), "added 2nd"),
Types.NestedField.optional(3, "field3", Types.IntegerType.get(), "added 3rd")
)

// Initial partition fields
def initialPartitionFields = [
new PartitionField(1, 1, "field1", new Identity()),
new PartitionField(2, 2, "field2", new VoidTransform<String>()),
]

when:
def fieldDtos = this.converter.icebergSchemaTofieldDtos(initialSchema, initialPartitionFields)

then:
fieldDtos.size() == 3

// Validate the first field
def field1 = fieldDtos.find { it.name == "field1" }
field1 != null
field1.partitionKey == true

// Validate the second field
def field2 = fieldDtos.find { it.name == "field2" }
field2 != null
field2.partitionKey == false

// Validate the third field
def field3 = fieldDtos.find { it.name == "field3" }
field3 != null
field3.partitionKey == false

noExceptionThrown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"format-version" : 1,
"table-uuid" : "77ea2333-acd1-4d8b-9870-b3dcece00c87",
"location" : "file:/tmp/dat",
"last-updated-ms" : 1725487921787,
"last-column-id" : 4,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "field1",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "field2",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "field3",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "field4",
"required" : false,
"type" : "int"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "field1",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "field2",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "field3",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "field4",
"required" : false,
"type" : "int"
} ]
} ],
"partition-spec" : [ {
"name" : "field1",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
}, {
"name" : "field2",
"transform" : "void",
"source-id" : 2,
"field-id" : 1001
} ],
"default-spec-id" : 1,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "field1",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
}, {
"name" : "field2",
"transform" : "identity",
"source-id" : 2,
"field-id" : 1001
} ]
}, {
"spec-id" : 1,
"fields" : [ {
"name" : "field1",
"transform" : "identity",
"source-id" : 1,
"field-id" : 1000
}, {
"name" : "field2",
"transform" : "void",
"source-id" : 2,
"field-id" : 1001
} ]
} ],
"last-partition-id" : 1001,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "owner",
"acls" : "[{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":true},{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":false}]",
"field.metadata.json" : "{\"1\":{},\"2\":{},\"3\":{},\"4\":{}}"
},
"current-snapshot-id" : -1,
"refs" : { },
"snapshots" : [ ],
"statistics" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ {
"timestamp-ms" : 1725487921770,
"metadata-file" : "file:/tmp/data/metadata/00001-abf48887-aa4f-4bcc-9219-1e1721314ee1.metadata.json"
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import feign.RetryableException
import feign.Retryer
import groovy.sql.Sql
import org.apache.commons.io.FileUtils
import org.apache.iceberg.PartitionField
import org.joda.time.Instant
import org.skyscreamer.jsonassert.JSONAssert
import spock.lang.Ignore
Expand Down Expand Up @@ -831,6 +832,50 @@ class MetacatSmokeSpec extends Specification {
api.deleteTable(catalogName, databaseName, tableName)
}

@Unroll
def "Test ignore void transform as partition fields"() {
given:
def catalogName = 'embedded-fast-hive-metastore'
def databaseName = 'iceberg_db'
def tableName = 'iceberg_table_6'
def uri = isLocalEnv ? String.format('file:/tmp/data/') : null
def tableDto = new TableDto(
name: QualifiedName.ofTable(catalogName, databaseName, tableName),
serde: new StorageDto(
owner: 'metacat-test',
inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
serializationLib: 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
parameters: [
'serialization.format': '1'
],
uri: uri
),
definitionMetadata: null,
dataMetadata: null,
fields: null
)

def metadataLocation = String.format('/tmp/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json')
def metadata = [table_type: 'ICEBERG', metadata_location: metadataLocation]
tableDto.setMetadata(metadata)

when:
try {api.createDatabase(catalogName, databaseName, new DatabaseCreateRequestDto())
} catch (Exception ignored) {
}
api.createTable(catalogName, databaseName, tableName, tableDto)
def tableDTO = api.getTable(catalogName, databaseName, tableName, true, true, true)

then:
tableDTO.getFields().size() == 4
tableDTO.getPartition_keys().size() == 1
tableDTO.getPartition_keys()[0] == "field1"

cleanup:
api.deleteTable(catalogName, databaseName, tableName)
}

@Unroll
def "Test get partitions from iceberg table using #filter"() {
def catalogName = 'embedded-fast-hive-metastore'
Expand Down

0 comments on commit dc3bc13

Please sign in to comment.