Skip to content

Commit d5c05e1

Browse files
author
xiaohongbo
committed
[spark] Update Spark catalog tableExists and Hive catalog dropTable logic if table in hms but not in fs
1 parent ffb791c commit d5c05e1

File tree

5 files changed

+156
-11
lines changed

5 files changed

+156
-11
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

+6
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,12 @@ public TableNotExistException(Identifier identifier, Throwable cause) {
534534
this.identifier = identifier;
535535
}
536536

537+
protected TableNotExistException(
538+
String customFormat, Identifier identifier, Throwable cause) {
539+
super(String.format(customFormat, identifier.getFullName()), cause);
540+
this.identifier = identifier;
541+
}
542+
537543
public Identifier identifier() {
538544
return identifier;
539545
}

paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java

+79-11
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ public void createPartitions(Identifier identifier, List<Map<String, String>> pa
362362
String dataFilePath =
363363
hmsTable.getParameters().containsKey(DATA_FILE_PATH_DIRECTORY.key())
364364
? sd.getLocation()
365-
+ "/"
366-
+ hmsTable.getParameters().get(DATA_FILE_PATH_DIRECTORY.key())
365+
+ "/"
366+
+ hmsTable.getParameters().get(DATA_FILE_PATH_DIRECTORY.key())
367367
: sd.getLocation();
368368
List<Partition> hivePartitions = new ArrayList<>();
369369
for (Map<String, String> partitionSpec : partitions) {
@@ -373,7 +373,7 @@ public void createPartitions(Identifier identifier, List<Map<String, String>> pa
373373
dataFilePath
374374
+ "/"
375375
+ PartitionPathUtils.generatePartitionPath(
376-
new LinkedHashMap<>(partitionSpec)));
376+
new LinkedHashMap<>(partitionSpec)));
377377
hivePartition.setDbName(identifier.getDatabaseName());
378378
hivePartition.setTableName(identifier.getTableName());
379379
hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
@@ -692,9 +692,9 @@ private TableSchema loadTableSchema(Identifier identifier, Table table)
692692
throws TableNotExistException {
693693
if (isPaimonTable(table)) {
694694
return tableSchemaInFileSystem(
695-
getTableLocation(identifier, table),
696-
identifier.getBranchNameOrDefault())
697-
.orElseThrow(() -> new TableNotExistException(identifier));
695+
getTableLocation(identifier, table),
696+
identifier.getBranchNameOrDefault())
697+
.orElseThrow(() -> new HmsTableNotExistInFsException(identifier));
698698
}
699699

700700
if (!formatTableDisabled()) {
@@ -705,7 +705,7 @@ private TableSchema loadTableSchema(Identifier identifier, Table table)
705705
}
706706
}
707707

708-
throw new TableNotExistException(identifier);
708+
throw new HmsTableNotExistInFsException(identifier);
709709
}
710710

711711
@Override
@@ -888,6 +888,31 @@ private boolean usingExternalTable(Map<String, String> tableOptions) {
888888
|| "TRUE".equalsIgnoreCase(externalPropValue);
889889
}
890890

891+
@Override
892+
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
893+
throws TableNotExistException {
894+
checkNotBranch(identifier, "dropTable");
895+
checkNotSystemTable(identifier, "dropTable");
896+
897+
try {
898+
getTable(identifier);
899+
} catch (TableNotExistException e) {
900+
if (e instanceof HmsTableNotExistInFsException) {
901+
LOG.warn(
902+
"Table {}.{} exists in Hive metastore, but not exist in file system. will try to drop it in Hive metastore only.",
903+
identifier.getDatabaseName(),
904+
identifier.getTableName());
905+
dropHmsTableQuietly(identifier);
906+
}
907+
if (ignoreIfNotExists) {
908+
return;
909+
}
910+
throw new TableNotExistException(identifier);
911+
}
912+
913+
dropTableImpl(identifier);
914+
}
915+
891916
@Override
892917
protected void dropTableImpl(Identifier identifier) {
893918
try {
@@ -1080,9 +1105,9 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
10801105
// first commit changes to underlying files
10811106
schema = runWithLock(identifier, () -> schemaManager.commitChanges(changes));
10821107
} catch (TableNotExistException
1083-
| ColumnAlreadyExistException
1084-
| ColumnNotExistException
1085-
| RuntimeException e) {
1108+
| ColumnAlreadyExistException
1109+
| ColumnNotExistException
1110+
| RuntimeException e) {
10861111
throw e;
10871112
} catch (Exception e) {
10881113
throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e);
@@ -1648,4 +1673,47 @@ public int getBatchGetTableSize() {
16481673
return DEFAULT_TABLE_BATCH_SIZE;
16491674
}
16501675
}
1651-
}
1676+
1677+
private void dropHmsTableQuietly(Identifier identifier) throws TableNotExistException {
1678+
try {
1679+
boolean externalTable = isExternalTable(getHmsTable(identifier));
1680+
clients.execute(
1681+
client ->
1682+
client.dropTable(
1683+
identifier.getDatabaseName(),
1684+
identifier.getTableName(),
1685+
!externalTable,
1686+
false,
1687+
true));
1688+
} catch (InterruptedException e) {
1689+
Thread.currentThread().interrupt();
1690+
LOG.warn(
1691+
"Interrupted in call to drop hms table {} which does not exist in file system",
1692+
identifier.getFullName(),
1693+
e);
1694+
} catch (Exception e) {
1695+
LOG.warn(
1696+
"Failed to drop hms table {} which does not exist in file system",
1697+
identifier.getFullName(),
1698+
e);
1699+
}
1700+
}
1701+
1702+
/**
1703+
* Exception for trying to operate on a table which exists in Hive metastore but doesn't exist
1704+
* in file system.
1705+
*/
1706+
public static class HmsTableNotExistInFsException extends TableNotExistException {
1707+
1708+
protected static final String MSG =
1709+
"Table %s exists in Hive metastore but does not exist in file system.";
1710+
1711+
public HmsTableNotExistInFsException(Identifier identifier) {
1712+
this(identifier, null);
1713+
}
1714+
1715+
public HmsTableNotExistInFsException(Identifier identifier, Throwable cause) {
1716+
super(MSG, identifier, cause);
1717+
}
1718+
}
1719+
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java

+23
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import org.apache.paimon.catalog.CatalogContext;
2424
import org.apache.paimon.catalog.CatalogFactory;
2525
import org.apache.paimon.catalog.PropertyChange;
26+
import org.apache.paimon.hive.HiveCatalog;
2627
import org.apache.paimon.options.Options;
2728
import org.apache.paimon.schema.Schema;
2829
import org.apache.paimon.schema.SchemaChange;
30+
import org.apache.paimon.spark.analysis.SparkTableNotExistInFsException;
2931
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
3032
import org.apache.paimon.spark.catalog.SupportFunction;
3133
import org.apache.paimon.spark.catalog.SupportView;
@@ -246,6 +248,21 @@ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTable
246248
}
247249
}
248250

251+
@Override
252+
public boolean tableExists(Identifier ident) {
253+
try {
254+
return this.loadTable(ident) != null;
255+
} catch (SparkTableNotExistInFsException e) {
256+
LOG.warn(
257+
"Table {}.{} exists in Hive metastore, but not in file system, we think table exists in this case",
258+
ident.namespace(),
259+
ident.name());
260+
return true;
261+
} catch (NoSuchTableException e) {
262+
return false;
263+
}
264+
}
265+
249266
/**
250267
* Do not annotate with <code>@override</code> here to maintain compatibility with Spark 3.2-.
251268
*
@@ -448,6 +465,12 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
448465
copyWithSQLConf(
449466
paimonTable, catalogName, toIdentifier(ident), extraOptions));
450467
}
468+
} catch (HiveCatalog.HmsTableNotExistInFsException e) {
469+
LOG.warn(
470+
"Table {}.{} exists in Hive metastore, but not exist in file system.",
471+
ident.namespace(),
472+
ident.name());
473+
throw new SparkTableNotExistInFsException(ident);
451474
} catch (Catalog.TableNotExistException e) {
452475
throw new NoSuchTableException(ident);
453476
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java

+16
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.hive.HiveCatalogOptions;
2323
import org.apache.paimon.options.CatalogOptions;
24+
import org.apache.paimon.spark.analysis.SparkTableNotExistInFsException;
2425
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
2526
import org.apache.paimon.spark.util.SQLConfUtils;
2627
import org.apache.paimon.utils.Preconditions;
@@ -154,6 +155,21 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
154155
return asTableCatalog().listTables(namespace);
155156
}
156157

158+
@Override
159+
public boolean tableExists(Identifier ident) {
160+
try {
161+
return this.loadTable(ident) != null;
162+
} catch (SparkTableNotExistInFsException e) {
163+
LOG.warn(
164+
"Table {}.{} exists in Hive metastore, but not in file system, we think table exists in this case",
165+
ident.namespace(),
166+
ident.name());
167+
return true;
168+
} catch (NoSuchTableException e) {
169+
return false;
170+
}
171+
}
172+
157173
@Override
158174
public Table loadTable(Identifier ident) throws NoSuchTableException {
159175
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.analysis;
20+
21+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
22+
import org.apache.spark.sql.connector.catalog.Identifier;
23+
24+
/**
25+
* Thrown by spark catalog when a table exists in Hive Metastore but cannot be found in file system.
26+
*/
27+
public class SparkTableNotExistInFsException extends NoSuchTableException {
28+
29+
public SparkTableNotExistInFsException(Identifier ident) {
30+
super(ident);
31+
}
32+
}

0 commit comments

Comments
 (0)