Skip to content

Commit

Permalink
add integeration test
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyouze committed Jan 17, 2025
1 parent dc48d0a commit 8b20d04
Show file tree
Hide file tree
Showing 21 changed files with 592 additions and 19 deletions.
4 changes: 2 additions & 2 deletions docs/spark-connector/spark-catalog-jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables,
:::

#### Not supported operations:
- `CREATE DATABASE`
- `UPDATE`
- `DELETE`
- `TRUNCATE`
Expand All @@ -32,7 +31,8 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables,
```sql
-- Suppose mysql_a is the mysql catalog name managed by Gravitino
USE mysql_a;
-- Suppose mydatabase is in your mysql

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

CREATE TABLE IF NOT EXISTS employee (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.gravitino.spark.connector.jdbc;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.catalog.BaseCatalog;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable;
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand All @@ -51,7 +55,14 @@ protected Table createSparkTable(
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter,
SparkTypeConverter sparkTypeConverter) {
return sparkTable;
return new SparkJdbcTable(
identifier,
gravitinoTable,
(JDBCTable) sparkTable,
(JDBCTableCatalog) sparkCatalog,
propertiesConverter,
sparkTransformConverter,
sparkTypeConverter);
}

@Override
Expand All @@ -63,4 +74,16 @@ protected PropertiesConverter getPropertiesConverter() {
protected SparkTransformConverter getSparkTransformConverter() {
return new SparkTransformConverter(false);
}

@Override
protected SparkTypeConverter getSparkTypeConverter() {
return new SparkJdbcTypeConverter();
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
super.createNamespace(
namespace, Maps.filterKeys(metadata, key -> key.equals(SupportsNamespaces.PROP_COMMENT)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.spark.connector.jdbc;

import java.util.Map;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable;
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog;
import org.apache.spark.sql.types.StructType;

public class SparkJdbcTable extends JDBCTable {

private GravitinoTableInfoHelper gravitinoTableInfoHelper;

public SparkJdbcTable(
Identifier identifier,
Table gravitinoTable,
JDBCTable jdbcTable,
JDBCTableCatalog jdbcTableCatalog,
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter,
SparkTypeConverter sparkTypeConverter) {
super(identifier, jdbcTable.schema(), jdbcTable.jdbcOptions());
this.gravitinoTableInfoHelper =
new GravitinoTableInfoHelper(
false,
identifier,
gravitinoTable,
propertiesConverter,
sparkTransformConverter,
sparkTypeConverter);
}

@Override
public String name() {
return gravitinoTableInfoHelper.name();
}

@Override
@SuppressWarnings("deprecation")
public StructType schema() {
return gravitinoTableInfoHelper.schema();
}

@Override
public Map<String, String> properties() {
return gravitinoTableInfoHelper.properties();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.spark.connector.jdbc;

import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;

public class SparkJdbcTypeConverter extends SparkTypeConverter {

@Override
public DataType toSparkType(Type gravitinoType) {
if (gravitinoType instanceof Types.VarCharType) {
return DataTypes.StringType;
} else {
return super.toSparkType(gravitinoType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ public class CatalogNameAdaptor {
"lakehouse-paimon-3.5",
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35");

private static final Map<String, String> jdbcCatalogNames =
ImmutableMap.of(
"3.3",
"org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark33",
"3.4",
"org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark34",
"3.5",
"org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark35");

private static String sparkVersion() {
return package$.MODULE$.SPARK_VERSION();
}

private static String getCatalogName(String provider, int majorVersion, int minorVersion) {
if (provider.startsWith("jdbc")) {
return "org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalog";
return jdbcCatalogNames.get(String.format("%d.%d", majorVersion, minorVersion));
}
String key =
String.format("%s-%d.%d", provider.toLowerCase(Locale.ROOT), majorVersion, minorVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ private static String getRowLevelDeleteTableSql(

protected abstract boolean supportsReplaceColumns();

protected abstract boolean supportsProperties();

protected abstract boolean supportsComplexType();

protected SparkTableInfoChecker getTableInfoChecker() {
return SparkTableInfoChecker.create();
}

// Use a custom database not the original default database because SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand Down Expand Up @@ -189,6 +197,7 @@ void testLoadCatalogs() {
}

@Test
@EnabledIf("supportsProperties")
protected void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
Expand Down Expand Up @@ -218,6 +227,7 @@ protected void testCreateAndLoadSchema() {
}

@Test
@EnabledIf("supportsProperties")
protected void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
Expand Down Expand Up @@ -266,7 +276,7 @@ void testCreateSimpleTable() {
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withComment(null);
Expand All @@ -287,7 +297,7 @@ void testCreateTableWithDatabase() {
createSimpleTable(tableIdentifier);
SparkTableInfo tableInfo = getTableInfo(tableIdentifier);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn());
getTableInfoChecker().withName(tableName).withColumns(getSimpleTableColumn());
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

Expand All @@ -300,8 +310,7 @@ void testCreateTableWithDatabase() {
dropTableIfExists(tableName);
createSimpleTable(tableName);
tableInfo = getTableInfo(tableName);
checker =
SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn());
checker = getTableInfoChecker().withName(tableName).withColumns(getSimpleTableColumn());
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}
Expand All @@ -317,7 +326,7 @@ void testCreateTableWithComment() {
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withComment(tableComment);
Expand Down Expand Up @@ -396,6 +405,7 @@ void testListTable() {
}

@Test
@EnabledIf("supportsProperties")
void testAlterTableSetAndRemoveProperty() {
String tableName = "test_property";
dropTableIfExists(tableName);
Expand Down Expand Up @@ -425,8 +435,7 @@ void testAlterTableUpdateComment() {
"ALTER TABLE %s SET TBLPROPERTIES('%s'='%s')",
tableName, ConnectorConstants.COMMENT, comment));
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create().withName(tableName).withComment(comment);
SparkTableInfoChecker checker = getTableInfoChecker().withName(tableName).withComment(comment);
checker.check(tableInfo);
}

Expand Down Expand Up @@ -593,6 +602,7 @@ protected void testAlterTableReplaceColumns() {
}

@Test
@EnabledIf("supportsComplexType")
void testComplexType() {
String tableName = "complex_type_table";
dropTableIfExists(tableName);
Expand Down Expand Up @@ -632,7 +642,7 @@ void testCreateDatasourceFormatPartitionTable() {
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withIdentifyPartition(Arrays.asList("name", "age"));
Expand All @@ -652,7 +662,7 @@ void testCreateBucketTable() {
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withBucket(4, Arrays.asList("id", "name"));
Expand All @@ -672,7 +682,7 @@ void testCreateSortBucketTable() {
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withBucket(4, Arrays.asList("id", "name"), Arrays.asList("name", "id"));
Expand All @@ -695,7 +705,7 @@ void testCreateTableAsSelect() {

SparkTableInfo newTableInfo = getTableInfo(newTableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create().withName(newTableName).withColumns(getSimpleTableColumn());
getTableInfoChecker().withName(newTableName).withColumns(getSimpleTableColumn());
checker.check(newTableInfo);

List<String> tableData = getTableData(newTableName);
Expand Down Expand Up @@ -797,6 +807,7 @@ protected void deleteDirIfExists(String path) {
}

@Test
@EnabledIf("supportsProperties")
void testTableOptions() {
String tableName = "options_table";
dropTableIfExists(tableName);
Expand All @@ -806,7 +817,7 @@ void testTableOptions() {
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b"));
checker.check(tableInfo);
Expand Down Expand Up @@ -983,7 +994,7 @@ protected void createSimpleTable(String identifier) {

protected void checkTableColumns(
String tableName, List<SparkColumnInfo> columns, SparkTableInfo tableInfo) {
SparkTableInfoChecker.create()
getTableInfoChecker()
.withName(tableName)
.withColumns(columns)
.withComment(null)
Expand Down
Loading

0 comments on commit 8b20d04

Please sign in to comment.