|
18 | 18 |
|
19 | 19 | package org.apache.paimon.spark.sql
|
20 | 20 |
|
21 |
| -import org.apache.paimon.catalog.Identifier |
22 | 21 | import org.apache.paimon.fs.Path
|
23 | 22 | import org.apache.paimon.hive.HiveMetastoreClient
|
24 |
| -import org.apache.paimon.spark.PaimonHiveTestBase |
25 |
| -import org.apache.paimon.spark.catalog.WithPaimonCatalog |
| 23 | +import org.apache.paimon.spark.PaimonHiveTestBase.hiveUri |
| 24 | +import org.apache.paimon.spark.{PaimonHiveTestBase, SparkCatalog} |
26 | 25 | import org.apache.paimon.table.FileStoreTable
|
27 |
| - |
| 26 | +import org.apache.spark.SparkConf |
28 | 27 | import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
29 | 28 | import org.junit.jupiter.api.Assertions
|
30 | 29 |
|
31 | 30 | abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
|
32 | 31 |
|
| 32 | + protected val paimonHiveNoCacheCatalogName: String = "paimon_hive_no_cache" |
| 33 | + |
| 34 | + override protected def sparkConf: SparkConf = { |
| 35 | + super.sparkConf.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.cache-enabled", "false") |
| 36 | + .set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName", classOf[SparkCatalog].getName) |
| 37 | + .set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.metastore", "hive") |
| 38 | + .set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.warehouse", tempHiveDBDir.getCanonicalPath) |
| 39 | + .set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.uri", hiveUri) |
| 40 | + } |
| 41 | + |
33 | 42 | test("Paimon DDL with hive catalog: create database with location and comment") {
|
34 | 43 | Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
|
35 | 44 | catalogName =>
|
@@ -624,47 +633,26 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
|
624 | 633 | }
|
625 | 634 |
|
626 | 635 | test("Paimon DDL with hive catalog: drop table which location has been deleted") {
|
627 |
| - spark.close() |
628 |
| - |
629 |
| - Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { |
| 636 | + Seq("paimon", sparkCatalogName, paimonHiveCatalogName, paimonHiveNoCacheCatalogName).foreach { |
630 | 637 | catalogName =>
|
631 |
| - Seq(false, true).foreach { |
632 |
| - cacheEnabled => |
633 |
| - val customSpark = SparkSession |
634 |
| - .builder() |
635 |
| - .master("local[2]") |
636 |
| - .config(sparkConf) |
637 |
| - .config(s"spark.sql.catalog.$catalogName.cache-enabled", cacheEnabled) |
638 |
| - .getOrCreate() |
639 |
| - |
640 |
| - customSpark.sql(s"USE $catalogName") |
641 |
| - customSpark.sql("CREATE DATABASE IF NOT EXISTS paimon_db") |
642 |
| - |
643 |
| - withDatabase("paimon_db") { |
644 |
| - customSpark.sql("USE paimon_db") |
645 |
| - customSpark.sql("CREATE TABLE t USING paimon") |
646 |
| - val currentCatalog = customSpark.sessionState.catalogManager.currentCatalog |
647 |
| - .asInstanceOf[WithPaimonCatalog] |
648 |
| - .paimonCatalog() |
649 |
| - val table = currentCatalog |
650 |
| - .getTable(Identifier.create("paimon_db", "t")) |
651 |
| - .asInstanceOf[FileStoreTable] |
652 |
| - table.fileIO().delete(table.location(), true) |
653 |
| - if (catalogName.equals("paimon")) { |
654 |
| - // Filesystem catalog determines whether a table exists based on table location |
655 |
| - assert(customSpark.sql("SHOW TABLES").count() == 0) |
656 |
| - } else { |
657 |
| - // Hive catalog determines whether a table exists based on metadata in hms |
658 |
| - assert(customSpark.sql("SHOW TABLES").count() == 1) |
659 |
| - } |
660 |
| - customSpark.sql("DROP TABLE IF EXISTS t") |
661 |
| - assert(customSpark.sql("SHOW TABLES").count() == 0) |
662 |
| - } |
663 |
| - |
664 |
| - customSpark.close() |
| 638 | + spark.sql(s"USE $catalogName") |
| 639 | + withDatabase("paimon_db") { |
| 640 | + spark.sql(s"CREATE DATABASE paimon_db") |
| 641 | + spark.sql(s"USE paimon_db") |
| 642 | + spark.sql("CREATE TABLE t USING paimon") |
| 643 | + val table = loadTable("paimon_db", "t") |
| 644 | + table.fileIO().delete(table.location(), true) |
| 645 | + if (catalogName.equals("paimon")) { |
| 646 | + // Filesystem catalog determines whether a table exists based on table location |
| 647 | + assert(spark.sql("SHOW TABLES").count() == 0) |
| 648 | + } else { |
| 649 | + // Hive catalog determines whether a table exists based on metadata in hms |
| 650 | + assert(spark.sql("SHOW TABLES").count() == 1) |
| 651 | + } |
| 652 | + spark.sql("DROP TABLE IF EXISTS t") |
| 653 | + assert(spark.sql("SHOW TABLES").count() == 0) |
665 | 654 | }
|
666 | 655 | }
|
667 |
| - reset() |
668 | 656 | }
|
669 | 657 |
|
670 | 658 | def getDatabaseProp(dbName: String, propertyName: String): String = {
|
|
0 commit comments