Skip to content

Commit

Permalink
[flink] Fix read partitioned table throw LogScanner is not subscribed…
Browse files Browse the repository at this point in the history
… any bucket
  • Loading branch information
luoyuxia committed Jan 2, 2025
1 parent 07cae40 commit 73924af
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ private TableDescriptor(
.allMatch(e -> e.getKey() != null && e.getValue() != null),
"options cannot have null keys or values.");

if (getAutoPartitionStrategy().isAutoPartitionEnabled() && !isPartitioned()) {
throw new IllegalArgumentException(
"Auto partition is only supported when table is partitioned.");
}

if (hasPrimaryKey()
&& getKvFormat() == KvFormat.COMPACTED
&& getLogFormat() != LogFormat.ARROW) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigBuilder;
import com.alibaba.fluss.config.ConfigOption;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.types.DataTypes;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -308,4 +309,16 @@ void testPartitionedTable() {
.hasMessage(
"Bucket key [f0, f3] shouldn't include any column in partition keys [f0].");
}

@Test
void testAutoPartitionForNonPartitionedTableShouldThrowException() {
assertThatThrownBy(
() ->
TableDescriptor.builder()
.schema(SCHEMA_1)
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Auto partition is only supported when table is partitioned.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public FlinkSourceEnumerator(
this(
tablePath,
flussConf,
isPartitioned,
hasPrimaryKey,
isPartitioned,
context,
Collections.emptySet(),
Collections.emptyMap(),
Expand All @@ -151,8 +151,8 @@ public FlinkSourceEnumerator(
public FlinkSourceEnumerator(
TablePath tablePath,
Configuration flussConf,
boolean isPartitioned,
boolean hasPrimaryKey,
boolean isPartitioned,
SplitEnumeratorContext<SourceSplitBase> context,
Set<TableBucket> assignedTableBuckets,
Map<Long, String> assignedPartitions,
Expand Down Expand Up @@ -306,26 +306,29 @@ private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionIn
}
};

Set<PartitionInfo> assignedOrPendingPartitions = new HashSet<>();
assignedPartitions.forEach(
(partitionId, partitionName) ->
dedupOrMarkAsRemoved.accept(new PartitionInfo(partitionId, partitionName)));
assignedOrPendingPartitions.add(
new PartitionInfo(partitionId, partitionName)));

pendingSplitAssignment.forEach(
(reader, splits) ->
splits.forEach(
split -> {
long partitionId =
checkNotNull(
split.getTableBucket().getPartitionId(),
"partition id shouldn't be null for the splits of partitioned table.");
String partitionName =
checkNotNull(
split.getPartitionName(),
"partition name shouldn't be null for the splits of partitioned table.");
PartitionInfo partitionInfo =
new PartitionInfo(partitionId, partitionName);
dedupOrMarkAsRemoved.accept(partitionInfo);
}));
pendingSplitAssignment.values().stream()
.flatMap(Collection::stream)
.forEach(
split -> {
long partitionId =
checkNotNull(
split.getTableBucket().getPartitionId(),
"partition id shouldn't be null for the splits of partitioned table.");
String partitionName =
checkNotNull(
split.getPartitionName(),
"partition name shouldn't be null for the splits of partitioned table.");
assignedOrPendingPartitions.add(
new PartitionInfo(partitionId, partitionName));
});

assignedOrPendingPartitions.forEach(dedupOrMarkAsRemoved);

if (!removedPartitionIds.isEmpty()) {
LOG.info("Discovered removed partitions: {}", removedPartitionIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,9 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds<RecordAndPo
private @Nullable TableBucket currentTableBucket;
private @Nullable Long currentSplitStoppingOffset;

// for multiple splits
public FlinkRecordsWithSplitIds(
Map<String, CloseableIterator<RecordAndPos>> splitRecords,
Iterator<String> splitIterator,
Iterator<TableBucket> tableBucketIterator,
public static FlinkRecordsWithSplitIds emptyRecords(
FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
this(
splitRecords,
splitIterator,
tableBucketIterator,
new HashSet<>(),
flinkSourceReaderMetrics);
return new FlinkRecordsWithSplitIds(Collections.emptySet(), flinkSourceReaderMetrics);
}

// only for single split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,15 @@ public RecordsWithSplitIds<RecordAndPos> fetch() throws IOException {
} else {
// may need to finish empty log splits
if (!emptyLogSplits.isEmpty()) {
FlinkRecordsWithSplitIds records =
new FlinkRecordsWithSplitIds(emptyLogSplits, flinkSourceReaderMetrics);
emptyLogSplits.clear();
return new FlinkRecordsWithSplitIds(emptyLogSplits, flinkSourceReaderMetrics);
return records;
} else {
// if not subscribe any buckets, just return empty records
if (subscribedBuckets.isEmpty()) {
return FlinkRecordsWithSplitIds.emptyRecords(flinkSourceReaderMetrics);
}
ScanRecords scanRecords = logScanner.poll(POLL_TIMEOUT);
return forLogRecords(scanRecords);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,19 +405,25 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
enumerator.start();
// register all readers
for (int i = 0; i < numSubtasks; i++) {
registerReader(context, enumerator, i);
}

// invoke partition discovery callable again and there should assignments.
// invoke partition discovery callable again and there should be pending assignments.
runPeriodicPartitionDiscovery(context);

// register two readers
registerReader(context, enumerator, 0);
registerReader(context, enumerator, 1);

// invoke partition discovery callable again, shouldn't produce RemovePartitionEvent.
runPeriodicPartitionDiscovery(context);
assertThat(context.getSentSourceEvent()).isEmpty();

// now, register the third reader
registerReader(context, enumerator, 2);

// check the assignments
Map<Integer, List<SourceSplitBase>> expectedAssignment =
expectAssignments(enumerator, tableId, partitionNameByIds);
Map<Integer, List<SourceSplitBase>> actualAssignments =
getLastReadersAssignments(context);
Map<Integer, List<SourceSplitBase>> actualAssignments = getReadersAssignments(context);
checkAssignmentIgnoreOrder(actualAssignments, expectedAssignment);

// now, create a new partition and runPeriodicPartitionDiscovery again,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception {
}
}

@Test
void testNoSubscribedBucket() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-no-subscribe-bucket-table");
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build();
final TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1).build();
createTable(tablePath, tableDescriptor);

try (FlinkSourceSplitReader splitReader =
createSplitReader(tablePath, schema.toRowType())) {
// fetch shouldn't throw exception
RecordsWithSplitIds<RecordAndPos> records = splitReader.fetch();
assertThat(records.nextSplit()).isNull();
}
}

// ------------------

private void assignSplitsAndFetchUntilRetrieveRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,26 +591,28 @@ private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionIn
}
};

Set<PartitionInfo> assignedOrPendingPartitions = new HashSet<>();
assignedPartitions.forEach(
(partitionId, partitionName) ->
dedupOrMarkAsRemoved.accept(new PartitionInfo(partitionId, partitionName)));
assignedOrPendingPartitions.add(
new PartitionInfo(partitionId, partitionName)));
pendingSplitAssignment.values().stream()
.flatMap(Collection::stream)
.forEach(
split -> {
long partitionId =
checkNotNull(
split.getTableBucket().getPartitionId(),
"partition id shouldn't be null for the splits of partitioned table.");
String partitionName =
checkNotNull(
split.getPartitionName(),
"partition name shouldn't be null for the splits of partitioned table.");
assignedOrPendingPartitions.add(
new PartitionInfo(partitionId, partitionName));
});

pendingSplitAssignment.forEach(
(reader, splits) ->
splits.forEach(
split -> {
long partitionId =
checkNotNull(
split.getTableBucket().getPartitionId(),
"partition id shouldn't be null for the splits of partitioned table.");
String partitionName =
checkNotNull(
split.getPartitionName(),
"partition name shouldn't be null for the splits of partitioned table.");
PartitionInfo partitionInfo =
new PartitionInfo(partitionId, partitionName);
dedupOrMarkAsRemoved.accept(partitionInfo);
}));
assignedOrPendingPartitions.forEach(dedupOrMarkAsRemoved);

if (!removedPartitionIds.isEmpty()) {
LOG.info("Discovered removed partitions: {}", removedPartitionIds);
Expand Down

0 comments on commit 73924af

Please sign in to comment.