Skip to content

Commit ce0932d

Browse files
committed
fixup! Fix failure reading checkpoint created by CREATE OR REPLACE with different schema
1 parent c03610d commit ce0932d

File tree

4 files changed

+38
-24
lines changed

4 files changed

+38
-24
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,8 +1376,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
13761376
transactionLogWriter.flush();
13771377

13781378
if (replaceExistingTable) {
1379-
verify(commitVersion > tableHandle.getReadVersion());
1380-
writeCheckpointMandatory(session, schemaTableName, location, tableHandle.toCredentialsHandle(), commitVersion);
1379+
writeCheckpoint(session, schemaTableName, location, tableHandle.toCredentialsHandle(), commitVersion);
13811380
}
13821381
}
13831382
}
@@ -1749,7 +1748,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
17491748
writeCommitted = true;
17501749

17511750
if (handle.replace() && handle.readVersion().isPresent()) {
1752-
writeCheckpointMandatory(session, schemaTableName, location, handle.toCredentialsHandle(), commitVersion);
1751+
writeCheckpoint(session, schemaTableName, location, handle.toCredentialsHandle(), commitVersion);
17531752
}
17541753

17551754
if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) {
@@ -3183,7 +3182,7 @@ private void writeCheckpointIfNeeded(
31833182
// This does not pose correctness issue but may be confusing if someone looks into transaction log.
31843183
// To fix that we should allow for getting snapshot for given version.
31853184

3186-
writeCheckpointMandatory(session, table, tableLocation, credentialsHandle, newVersion);
3185+
writeCheckpoint(session, table, tableLocation, credentialsHandle, newVersion);
31873186
}
31883187
catch (Exception e) {
31893188
// We can't fail here as transaction was already committed, in case of INSERT this could result
@@ -3192,7 +3191,7 @@ private void writeCheckpointIfNeeded(
31923191
}
31933192
}
31943193

3195-
private void writeCheckpointMandatory(ConnectorSession session, SchemaTableName table, String tableLocation, VendedCredentialsHandle credentialsHandle, long newVersion)
3194+
private void writeCheckpoint(ConnectorSession session, SchemaTableName table, String tableLocation, VendedCredentialsHandle credentialsHandle, long newVersion)
31963195
throws IOException
31973196
{
31983197
TransactionLogReader transactionLogReader = new FileSystemTransactionLogReader(tableLocation, credentialsHandle, fileSystemFactory);

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -609,14 +609,18 @@ public void testCreateOrReplaceTable()
609609

610610
assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)",
611611
ImmutableMultiset.<CacheOperation>builder()
612-
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 821))
612+
.addCopies(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 821), 2)
613613
.add(new CacheOperation("Alluxio.readExternalStream", "00000000000000000000.json", 0, 821))
614614
.add(new CacheOperation("InputFile.newStream", "00000000000000000000.json"))
615615
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000000.json", 0, 821))
616-
.add(new CacheOperation("InputFile.length", "00000000000000000000.json"))
616+
.addCopies(new CacheOperation("InputFile.length", "00000000000000000000.json"), 2)
617617
.add(new CacheOperation("InputFile.exists", "00000000000000000001.json"))
618-
.add(new CacheOperation("InputFile.length", "00000000000000000001.json"))
619-
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
618+
.addCopies(new CacheOperation("InputFile.length", "00000000000000000001.json"), 2)
619+
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000001.json", 0, 821))
620+
.add(new CacheOperation("Alluxio.readExternalStream", "00000000000000000001.json", 0, 821))
621+
.add(new CacheOperation("Alluxio.readCached", "00000000000000000001.json", 0, 821))
622+
.add(new CacheOperation("InputFile.newStream", "00000000000000000001.json"))
623+
.addCopies(new CacheOperation("InputFile.newStream", "_last_checkpoint"), 2)
620624
.add(new CacheOperation("InputFile.exists", "extended_stats.json"))
621625
.build());
622626
assertUpdate("DROP TABLE test_create_or_replace");
@@ -636,16 +640,20 @@ public void testCreateOrReplaceTableAsSelect()
636640
assertFileSystemAccesses(
637641
"CREATE OR REPLACE TABLE test_create_or_replace_as_select AS SELECT 1 col_name",
638642
ImmutableMultiset.<CacheOperation>builder()
639-
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 1063))
643+
.addCopies(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 1063), 2)
640644
.add(new CacheOperation("Alluxio.readExternalStream", "00000000000000000000.json", 0, 1063))
641645
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000000.json", 0, 1063))
642646
.add(new CacheOperation("InputFile.newStream", "00000000000000000000.json"))
643-
.add(new CacheOperation("InputFile.length", "00000000000000000000.json"))
644-
.add(new CacheOperation("InputFile.length", "00000000000000000001.json"))
647+
.addCopies(new CacheOperation("InputFile.length", "00000000000000000000.json"), 2)
648+
.addCopies(new CacheOperation("InputFile.length", "00000000000000000001.json"), 2)
645649
.add(new CacheOperation("InputFile.exists", "00000000000000000001.json"))
650+
.add(new CacheOperation("Alluxio.readCached", "00000000000000000001.json", 0, 1223))
651+
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000001.json", 0, 1223))
652+
.add(new CacheOperation("Alluxio.readExternalStream", "00000000000000000001.json", 0, 1223))
653+
.add(new CacheOperation("InputFile.newStream", "00000000000000000001.json"))
646654
.add(new CacheOperation("InputFile.exists", "extendeded_stats.json"))
647655
.add(new CacheOperation("InputFile.newStream", "extended_stats.json"))
648-
.add(new CacheOperation("InputFile.newStream", "_last_checkpoint"))
656+
.addCopies(new CacheOperation("InputFile.newStream", "_last_checkpoint"), 2)
649657
.build());
650658

651659
assertUpdate("DROP TABLE test_create_or_replace_as_select");

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,9 @@ void testCreateReplaceReadingCheckpointWithDifferentSchema()
293293
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, 'bb')", 1);
294294

295295
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (x int, y int) with (checkpoint_interval = 2)");
296+
assertQueryReturnsEmptyResult("TABLE " + table.getName());
296297
assertUpdate("INSERT INTO " + table.getName() + " VALUES (3, 3)", 1);
297-
assertThat(query("SELECT * FROM " + table.getName()))
298+
assertThat(query("TABLE " + table.getName()))
298299
.matches("VALUES (3, 3)");
299300

300301
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (z varchar)");
@@ -310,8 +311,8 @@ void testCreateReplaceReadingCheckpointWithDifferentSchemaCTAS()
310311
// generate a checkpoint
311312
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, 'bb')", 1);
312313

313-
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 3 AS x, 3 AS y", 1);
314-
assertThat(query("SELECT * FROM " + table.getName()))
314+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 3 AS x, 3 AS y", 1);
315+
assertThat(query("TABLE " + table.getName()))
315316
.matches("VALUES (3, 3)");
316317

317318
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 'test' AS z", 1);

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,16 @@ public void testCreateOrReplaceTable()
143143

144144
assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)",
145145
ImmutableMultiset.<FileOperation>builder()
146-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
147-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"))
146+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), 2)
147+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), 2)
148148
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "OutputFile.createOrOverwrite"))
149-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
149+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2)
150150
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"))
151+
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
151152
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.exists"))
152-
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"))
153+
.add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.parquet", "OutputFile.create"))
154+
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), 2)
155+
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "OutputFile.createOrOverwrite"))
153156
.build());
154157
assertUpdate("DROP TABLE test_create_or_replace");
155158
}
@@ -174,12 +177,15 @@ public void testCreateOrReplaceTableAsSelect()
174177
.add(new FileOperation(STARBURST_EXTENDED_STATS_JSON, "extendeded_stats.json", "InputFile.exists"))
175178
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
176179
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "OutputFile.createOrOverwrite"))
177-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"))
178-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"))
180+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream"), 2)
181+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"), 2)
182+
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream"))
179183
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "OutputFile.createOrOverwrite"))
180-
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
184+
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"), 2)
185+
.add(new FileOperation(CHECKPOINT, "00000000000000000001.checkpoint.parquet", "OutputFile.create"))
181186
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.exists"))
182-
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"))
187+
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream"), 2)
188+
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "OutputFile.createOrOverwrite"))
183189
.add(new FileOperation(DATA, "no partition", "OutputFile.create"))
184190
.build());
185191

0 commit comments

Comments
 (0)