Skip to content

Commit

Permalink
resolve conversation
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyouze committed Jan 22, 2025
1 parent 95463d6 commit dfff9aa
Show file tree
Hide file tree
Showing 15 changed files with 40 additions and 20 deletions.
3 changes: 3 additions & 0 deletions docs/spark-connector/spark-catalog-jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables,

## Capabilities

Supports MySQL and PostgreSQL. For OceanBase which is compatible with Mysql Dialects could use Mysql driver and Mysql Dialects as a trackoff way. But for Doris which do not support MySQL Dialects, are not currently supported.

#### Support DML and DDL operations:

- `CREATE TABLE`
Expand All @@ -22,6 +24,7 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables,
:::

#### Not supported operations:

- `UPDATE`
- `DELETE`
- `TRUNCATE`
Expand Down
2 changes: 1 addition & 1 deletion docs/spark-connector/spark-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Apache Gravitino Spark connector leverages the Spark DataSourceV2 interface

## Capabilities

1. Supports [Hive catalog](spark-catalog-hive.md), [Iceberg catalog](spark-catalog-iceberg.md) and [Paimon catalog](spark-catalog-paimon.md).
1. Supports [Hive catalog](spark-catalog-hive.md), [Iceberg catalog](spark-catalog-iceberg.md), [Paimon catalog](spark-catalog-paimon.md) and [Jdbc catalog](spark-catalog-jdbc.md).
2. Supports federation query.
3. Supports most DDL and DML SQLs.

Expand Down
3 changes: 0 additions & 3 deletions spark-connector/spark-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat

dependencies {
implementation(project(":catalogs:catalog-common"))
implementation(project(":catalogs:catalog-jdbc-common")) {
exclude("org.apache.logging.log4j")
}
implementation(libs.guava)

compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class SparkJdbcTypeConverter extends SparkTypeConverter {

@Override
public DataType toSparkType(Type gravitinoType) {
// if spark version lower than 3.4.4, using VarCharType will throw an exception: Unsupported
// type varchar.
if (gravitinoType instanceof Types.VarCharType) {
return DataTypes.StringType;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static String getRowLevelDeleteTableSql(

protected abstract boolean supportsReplaceColumns();

protected abstract boolean supportsProperties();
protected abstract boolean supportsSchemaAndTableProperties();

protected abstract boolean supportsComplexType();

Expand Down Expand Up @@ -197,7 +197,7 @@ void testLoadCatalogs() {
}

@Test
@EnabledIf("supportsProperties")
@EnabledIf("supportsSchemaAndTableProperties")
protected void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
Expand Down Expand Up @@ -227,7 +227,7 @@ protected void testCreateAndLoadSchema() {
}

@Test
@EnabledIf("supportsProperties")
@EnabledIf("supportsSchemaAndTableProperties")
protected void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
Expand Down Expand Up @@ -405,7 +405,7 @@ void testListTable() {
}

@Test
@EnabledIf("supportsProperties")
@EnabledIf("supportsSchemaAndTableProperties")
void testAlterTableSetAndRemoveProperty() {
String tableName = "test_property";
dropTableIfExists(tableName);
Expand Down Expand Up @@ -807,7 +807,7 @@ protected void deleteDirIfExists(String path) {
}

@Test
@EnabledIf("supportsProperties")
@EnabledIf("supportsSchemaAndTableProperties")
void testTableOptions() {
String tableName = "options_table";
dropTableIfExists(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected boolean supportsReplaceColumns() {
}

@Override
protected boolean supportsProperties() {
protected boolean supportsSchemaAndTableProperties() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@Tag("gravitino-docker-test")
public abstract class SparkIcebergCatalogIT extends SparkCommonIT {

private static final String ICEBERG_FORMAT_VERSION = "format-version";
Expand Down Expand Up @@ -112,7 +110,7 @@ protected boolean supportsReplaceColumns() {
}

@Override
protected boolean supportsProperties() {
protected boolean supportsSchemaAndTableProperties() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.junit.jupiter.api.Tag;

@Tag("gravitino-docker-test")
public abstract class SparkJdbcCatalogIT extends SparkCommonIT {
public abstract class SparkJdbcMysqlCatalogIT extends SparkCommonIT {
@Override
protected boolean supportsSparkSQLClusteredBy() {
return false;
Expand All @@ -53,7 +53,7 @@ protected boolean supportsReplaceColumns() {
}

@Override
protected boolean supportsProperties() {
protected boolean supportsSchemaAndTableProperties() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static SparkJdbcTableInfoChecker create() {
return new SparkJdbcTableInfoChecker();
}

// Spark jdbc table cannot distinguish between comment=null and comment=""
@Override
public SparkTableInfoChecker withColumns(List<SparkTableInfo.SparkColumnInfo> columns) {
getExpectedTableInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected boolean supportsSchemaEvolution() {
}

@Override
protected boolean supportsProperties() {
protected boolean supportsSchemaAndTableProperties() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkJdbcCatalogIT33 extends SparkJdbcCatalogIT {
public class SparkJdbcMysqlCatalogIT33 extends SparkJdbcMysqlCatalogIT {
@Test
void testCatalogClassName() {
String catalogClass =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class SparkJdbcTypeConverter34 extends SparkTypeConverter34 {
@Override
public DataType toSparkType(Type gravitinoType) {
// if spark version lower than 3.4.4, using VarCharType will throw an exception: Unsupported
// type varchar.
if (gravitinoType instanceof Types.VarCharType) {
return DataTypes.StringType;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkJdbcCatalogIT34 extends SparkJdbcCatalogIT {
public class SparkJdbcMysqlCatalogIT34 extends SparkJdbcMysqlCatalogIT {
@Test
void testCatalogClassName() {
String catalogClass =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,21 @@

package org.apache.gravitino.spark.connector.jdbc;

public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalogSpark34 {}
import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
import org.apache.gravitino.spark.connector.SparkTableChangeConverter34;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter34;

public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalog {

@Override
protected SparkTypeConverter getSparkTypeConverter() {
return new SparkTypeConverter34();
}

@Override
protected SparkTableChangeConverter getSparkTableChangeConverter(
SparkTypeConverter sparkTypeConverter) {
return new SparkTableChangeConverter34(sparkTypeConverter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SparkJdbcCatalogIT35 extends SparkJdbcCatalogIT {
public class SparkJdbcMysqlCatalogIT35 extends SparkJdbcMysqlCatalogIT {
@Test
void testCatalogClassName() {
String catalogClass =
Expand Down

0 comments on commit dfff9aa

Please sign in to comment.