From 4f137d27001ae5e201cf605607cd82a381492183 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 19 Sep 2024 11:56:12 -0700 Subject: [PATCH] hard-code compaction tasks to use ARRAY for multi-value handling to preserve order (#17110) --- .../indexing/common/task/CompactionTask.java | 13 ++++- .../task/CompactionTaskParallelRunTest.java | 58 ++++++++++++++++--- .../common/task/CompactionTaskRunTest.java | 33 +++++++++-- .../common/task/CompactionTaskTest.java | 20 +++---- 4 files changed, 99 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 73c8a35405c4..b3c01d79f98b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -41,6 +41,7 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; @@ -989,9 +990,19 @@ private void processDimensionsSpec(final QueryableIndex index) // column for it. final ColumnHolder columnHolder = index.getColumnHolder(dimension); if (columnHolder != null) { + DimensionSchema schema = columnHolder.getColumnFormat().getColumnSchema(dimension); + // rewrite string dimensions to always use MultiValueHandling.ARRAY since it preserves the exact order of + // the row regardless of the mode the initial ingest was using + if (schema instanceof StringDimensionSchema) { + schema = new StringDimensionSchema( + schema.getName(), + DimensionSchema.MultiValueHandling.ARRAY, + schema.hasBitmapIndex() + ); + } dimensionSchemaMap.put( dimension, - columnHolder.getColumnFormat().getColumnSchema(dimension) + schema ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 377f4ece0657..6fdda300f6ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -29,8 +29,10 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -180,7 +182,12 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedState = new CompactionState( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -230,7 +237,12 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new HashedPartitionsSpec(null, 3, null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -295,7 +307,12 @@ public void testRunParallelWithRangePartitioning() throws Exception Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new SingleDimensionPartitionsSpec(7, null, "dim", false), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -410,7 +427,12 @@ public void testRunParallelWithMultiDimensionRangePartitioning() throws Exceptio Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -460,7 +482,12 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new SingleDimensionPartitionsSpec(7, null, "dim", false), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -513,7 +540,12 @@ public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() t Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass()); CompactionState expectedState = new CompactionState( new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -596,7 +628,12 @@ public void testRunCompactionWithFilterShouldStoreInState() throws Exception expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedState = new CompactionState( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), getObjectMapper().readValue( getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), @@ -658,7 +695,12 @@ public void testRunCompactionWithNewMetricsShouldStoreInState() throws Exception expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedState = new CompactionState( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedCountMetric, expectedLongSumMetric), getObjectMapper().readValue( getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index cdc7390eb2c3..a3fb807604fa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -34,6 +34,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.data.input.impl.CSVParseSpec; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.NewSpatialDimensionSchema; @@ -227,7 +228,12 @@ public static CompactionState getDefaultCompactionState( segmentGranularity, queryGranularity, intervals, - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), expectedLongSumMetric ); } @@ -386,7 +392,12 @@ public void testRunWithHashPartitioning() throws Exception expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedState = new CompactionState( new HashedPartitionsSpec(null, 3, null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), null, compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()), @@ -801,7 +812,12 @@ public void testCompactionWithFilterInTransformSpec() throws Exception expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedCompactionState = new CompactionState( new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedLongSumMetric), getObjectMapper().readValue(getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), Map.class), IndexSpec.DEFAULT.asMap(mapper), @@ -866,7 +882,12 @@ public void testCompactionWithNewMetricInMetricsSpec() throws Exception expectedLongSumMetric.put("fieldName", "val"); CompactionState expectedCompactionState = new CompactionState( new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null) + ) + ), ImmutableList.of(expectedCountMetric, expectedLongSumMetric), getObjectMapper().readValue(getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), Map.class), IndexSpec.DEFAULT.asMap(mapper), @@ -1676,8 +1697,8 @@ public void testRunWithSpatialDimensions() throws Exception ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1)), DimensionsSpec.builder() .setDimensions(Arrays.asList( - new StringDimensionSchema("ts"), - new StringDimensionSchema("dim"), + new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null), + new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null), new NewSpatialDimensionSchema("spatial", Collections.singletonList("spatial")) )) .build(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c1bf649980f6..90383b2b8e32 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -1619,10 +1619,10 @@ private Granularity chooseFinestGranularityHelper(List granularitie private static List getExpectedDimensionsSpecForAutoGeneration() { return ImmutableList.of( - new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), - new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), - new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), - new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))), new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))), new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) ); @@ -1632,27 +1632,27 @@ private static List getDimensionSchema(DimensionSchema mixedTyp { return Lists.newArrayList( new LongDimensionSchema("timestamp"), - new StringDimensionSchema("string_dim_4"), + new StringDimensionSchema("string_dim_4", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_4"), new FloatDimensionSchema("float_dim_4"), new DoubleDimensionSchema("double_dim_4"), - new StringDimensionSchema("string_dim_0"), + new StringDimensionSchema("string_dim_0", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_0"), new FloatDimensionSchema("float_dim_0"), new DoubleDimensionSchema("double_dim_0"), - new StringDimensionSchema("string_dim_1"), + new StringDimensionSchema("string_dim_1", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_1"), new FloatDimensionSchema("float_dim_1"), new DoubleDimensionSchema("double_dim_1"), - new StringDimensionSchema("string_dim_2"), + new StringDimensionSchema("string_dim_2", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_2"), new FloatDimensionSchema("float_dim_2"), new DoubleDimensionSchema("double_dim_2"), - new StringDimensionSchema("string_dim_3"), + new StringDimensionSchema("string_dim_3", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_3"), new FloatDimensionSchema("float_dim_3"), new DoubleDimensionSchema("double_dim_3"), - new StringDimensionSchema("string_dim_5"), + new StringDimensionSchema("string_dim_5", DimensionSchema.MultiValueHandling.ARRAY, null), new LongDimensionSchema("long_dim_5"), new FloatDimensionSchema("float_dim_5"), new DoubleDimensionSchema("double_dim_5"),