Skip to content

Commit

Permalink
hard-code compaction tasks to use ARRAY for multi-value handling to p…
Browse files Browse the repository at this point in the history
…reserve order (#17110)
  • Loading branch information
clintropolis committed Sep 19, 2024
1 parent 4057236 commit 4f137d2
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
Expand Down Expand Up @@ -989,9 +990,19 @@ private void processDimensionsSpec(final QueryableIndex index)
// column for it.
final ColumnHolder columnHolder = index.getColumnHolder(dimension);
if (columnHolder != null) {
DimensionSchema schema = columnHolder.getColumnFormat().getColumnSchema(dimension);
// rewrite string dimensions to always use MultiValueHandling.ARRAY since it preserves the exact order of
// the row regardless of the mode the initial ingest was using
if (schema instanceof StringDimensionSchema) {
schema = new StringDimensionSchema(
schema.getName(),
DimensionSchema.MultiValueHandling.ARRAY,
schema.hasBitmapIndex()
);
}
dimensionSchemaMap.put(
dimension,
columnHolder.getColumnFormat().getColumnSchema(dimension)
schema
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -180,7 +182,12 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedState = new CompactionState(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -230,7 +237,12 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new HashedPartitionsSpec(null, 3, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -295,7 +307,12 @@ public void testRunParallelWithRangePartitioning() throws Exception
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -410,7 +427,12 @@ public void testRunParallelWithMultiDimensionRangePartitioning() throws Exceptio
Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -460,7 +482,12 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -513,7 +540,12 @@ public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() t
Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -596,7 +628,12 @@ public void testRunCompactionWithFilterShouldStoreInState() throws Exception
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedState = new CompactionState(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()),
Expand Down Expand Up @@ -658,7 +695,12 @@ public void testRunCompactionWithNewMetricsShouldStoreInState() throws Exception
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedState = new CompactionState(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedCountMetric, expectedLongSumMetric),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
Expand Down Expand Up @@ -227,7 +228,12 @@ public static CompactionState getDefaultCompactionState(
segmentGranularity,
queryGranularity,
intervals,
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
expectedLongSumMetric
);
}
Expand Down Expand Up @@ -386,7 +392,12 @@ public void testRunWithHashPartitioning() throws Exception
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedState = new CompactionState(
new HashedPartitionsSpec(null, 3, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
null,
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
Expand Down Expand Up @@ -801,7 +812,12 @@ public void testCompactionWithFilterInTransformSpec() throws Exception
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedCompactionState = new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedLongSumMetric),
getObjectMapper().readValue(getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), Map.class),
IndexSpec.DEFAULT.asMap(mapper),
Expand Down Expand Up @@ -866,7 +882,12 @@ public void testCompactionWithNewMetricInMetricsSpec() throws Exception
expectedLongSumMetric.put("fieldName", "val");
CompactionState expectedCompactionState = new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null)
)
),
ImmutableList.of(expectedCountMetric, expectedLongSumMetric),
getObjectMapper().readValue(getObjectMapper().writeValueAsString(compactionTask.getTransformSpec()), Map.class),
IndexSpec.DEFAULT.asMap(mapper),
Expand Down Expand Up @@ -1676,8 +1697,8 @@ public void testRunWithSpatialDimensions() throws Exception
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1)),
DimensionsSpec.builder()
.setDimensions(Arrays.asList(
new StringDimensionSchema("ts"),
new StringDimensionSchema("dim"),
new StringDimensionSchema("ts", DimensionSchema.MultiValueHandling.ARRAY, null),
new StringDimensionSchema("dim", DimensionSchema.MultiValueHandling.ARRAY, null),
new NewSpatialDimensionSchema("spatial", Collections.singletonList("spatial"))
))
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1619,10 +1619,10 @@ private Granularity chooseFinestGranularityHelper(List<Granularity> granularitie
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration()
{
return ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
Expand All @@ -1632,27 +1632,27 @@ private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTyp
{
return Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_4"),
new StringDimensionSchema("string_dim_4", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema("string_dim_0"),
new StringDimensionSchema("string_dim_0", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_0"),
new FloatDimensionSchema("float_dim_0"),
new DoubleDimensionSchema("double_dim_0"),
new StringDimensionSchema("string_dim_1"),
new StringDimensionSchema("string_dim_1", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_1"),
new FloatDimensionSchema("float_dim_1"),
new DoubleDimensionSchema("double_dim_1"),
new StringDimensionSchema("string_dim_2"),
new StringDimensionSchema("string_dim_2", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_2"),
new FloatDimensionSchema("float_dim_2"),
new DoubleDimensionSchema("double_dim_2"),
new StringDimensionSchema("string_dim_3"),
new StringDimensionSchema("string_dim_3", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"),
new StringDimensionSchema("string_dim_5"),
new StringDimensionSchema("string_dim_5", DimensionSchema.MultiValueHandling.ARRAY, null),
new LongDimensionSchema("long_dim_5"),
new FloatDimensionSchema("float_dim_5"),
new DoubleDimensionSchema("double_dim_5"),
Expand Down

0 comments on commit 4f137d2

Please sign in to comment.