Skip to content

Commit 1ea853f

Browse files
authored
Add create table support, venice deployer, and various fixes (#182)
* Add base CREATE TABLE * Add Venice Deployer * Fix PipelineReconciler and AvroConverter defaults * Add drop tests * Fix failing tests caused by specify closing connections prematurely * Add back some comments about copy-pasted code * Add log
1 parent 9653817 commit 1ea853f

File tree

23 files changed

+970
-49
lines changed

23 files changed

+970
-49
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
/build
44
*/build/
55
*/*.iml
6+
*bin/
67
./models/external/
78
.DS_Store
89

deploy/config/hoptimator-configmap.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ data:
2222
flink.config: |
2323
flink.app.name=hoptimator-flink-runner
2424
flink.app.type=SQL
25+
26+
venice.config: |
27+
router.url=http://localhost:7777
28+
clusters=venice-cluster0

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
3535
String newNamespace = namespace + "." + name;
3636
if (dataType.isStruct()) {
3737
List<Schema.Field> fields = dataType.getFieldList().stream()
38-
.map(x -> new Schema.Field(sanitize(x.getName()), avro(newNamespace, x.getName(), x.getType()), describe(x), null))
38+
.map(x -> {
39+
Schema innerField = avro(newNamespace, x.getName(), x.getType());
40+
Object defaultValue = null;
41+
// For unions containing null, defaults are specified in a specific way
42+
if (innerField.isUnion() && innerField.isNullable()) {
43+
defaultValue = Schema.Field.NULL_DEFAULT_VALUE;
44+
}
45+
return new Schema.Field(sanitize(x.getName()), innerField, describe(x), defaultValue);
46+
})
3947
.collect(Collectors.toList());
4048
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), newNamespace, false, fields),
4149
dataType.isNullable());
@@ -150,6 +158,7 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
150158
/** Converts Avro Schema to RelDataType.
151159
* Nullability is preserved except for array types, JDBC is incapable of interpreting e.g. "FLOAT NOT NULL ARRAY"
152160
* causing "NOT NULL" arrays to get demoted to "ANY ARRAY" which is not desired.
161+
* TODO: default field values are lost when converting from Avro to RelDataType
153162
*/
154163
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
155164
RelDataType unknown = typeFactory.createUnknownType();

hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Map;
66
import java.util.Objects;
7+
import org.apache.avro.JsonProperties;
78
import org.apache.avro.Schema;
89
import org.apache.calcite.plan.RelOptUtil;
910
import org.apache.calcite.rel.type.RelDataType;
@@ -47,6 +48,7 @@ public void convertsNestedSchemas() {
4748
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
4849
assertFalse(avroSchema4.isNullable());
4950
assertEquals(avroSchema4.getFields().size(), rel1.getFieldCount());
51+
assertEquals(JsonProperties.NULL_VALUE, avroSchema4.getField("h").defaultVal());
5052
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
5153
assertTrue(avroSchema5.isNullable());
5254
assertEquals(avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount());

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 285 additions & 31 deletions
Large diffs are not rendered by default.

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private HoptimatorDdlUtils() {
6060
// N.B. copy-pasted from Apache Calcite
6161
/** Returns the schema in which to create an object;
6262
* the left part is null if the schema does not exist. */
63-
static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
63+
public static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
6464
final String name;
6565
final List<String> path;
6666
if (id.isSimple()) {
@@ -92,7 +92,7 @@ public static SqlNode renameColumns(SqlNodeList columnList, SqlNode query) {
9292
}
9393

9494
// N.B. copy-pasted from Apache Calcite
95-
static ViewTable viewTable(CalcitePrepare.Context context, String sql, CalcitePrepareImpl impl,
95+
public static ViewTable viewTable(CalcitePrepare.Context context, String sql, CalcitePrepareImpl impl,
9696
List<String> schemaPath, List<String> viewPath) {
9797
CalcitePrepare.AnalyzeViewResult analyzed = impl.analyzeView(context, sql, false);
9898
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);

hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,20 @@ public void createView() throws Exception {
4545
var expectedLogs = List.of(
4646
"[HoptimatorDdlExecutor] Validating statement: CREATE VIEW `V` AS\nSELECT *\nFROM `T`",
4747
"[HoptimatorDdlExecutor] Validated sql statement. The view is named V and has path [DEFAULT, V]",
48-
"[HoptimatorDdlExecutor] Validating view V with deployers",
48+
"[HoptimatorDdlExecutor] Validating deployable resources for view V",
4949
"[HoptimatorDdlExecutor] Validated view V",
5050
"[HoptimatorDdlExecutor] Deploying create view V",
5151
"[HoptimatorDdlExecutor] Deployed view V",
5252
"[HoptimatorDdlExecutor] Added view V to schema DEFAULT",
53-
"[HoptimatorDdlExecutor] CREATE VIEW V completed");
53+
"[HoptimatorDdlExecutor] CREATE VIEW V completed",
54+
"[HoptimatorDdlExecutor] Validating statement: DROP VIEW `V`",
55+
"[HoptimatorDdlExecutor] Deleting view V",
56+
"[HoptimatorDdlExecutor] Removed view V from schema DEFAULT",
57+
"[HoptimatorDdlExecutor] DROP VIEW `V` completed",
58+
"[HoptimatorDdlExecutor] Validating statement: DROP TABLE `T`",
59+
"[HoptimatorDdlExecutor] Deleting table T",
60+
"[HoptimatorDdlExecutor] Removed table T from schema DEFAULT",
61+
"[HoptimatorDdlExecutor] DROP TABLE `T` completed");
5462
Assertions.assertEquals(expectedLogs, logs);
5563
}
5664

@@ -64,7 +72,7 @@ public void dropNonExistentViewHandlesNullSchema() throws Exception {
6472
sql("DROP VIEW non_existing_schema.non_existing_view");
6573
});
6674
Assertions.assertTrue(
67-
ex.getMessage().matches("(?s).*Cannot DROP VIEW .*?: View .*? not found\\..*"),
75+
ex.getMessage().matches("(?s).*Cannot DROP VIEW .*?: Element .*? not found\\..*"),
6876
"Error message should match regex, but was: " + ex.getMessage()
6977
);
7078
}

hoptimator-k8s/src/test/resources/k8s-ddl.id

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,25 @@ select * from ads."PAGE_VIEWS$filter";
118118

119119
!ok
120120

121+
create or replace table ads."newtable" ("i" int, "s" VARCHAR);
122+
(0 rows modified)
123+
124+
!update
125+
126+
select * from ads."newtable";
127+
+---+---+
128+
| i | s |
129+
+---+---+
130+
+---+---+
131+
(0 rows)
132+
133+
!ok
134+
135+
drop table ads."newtable";
136+
(0 rows modified)
137+
138+
!update
139+
121140
drop materialized view ads."PAGE_VIEWS$filter";
122141
(0 rows modified)
123142

hoptimator-k8s/src/test/resources/k8s-metadata.id

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,12 @@ drop materialized view ads.pages;
115115

116116
!update
117117

118-
drop materialized view ads.audience;
118+
drop materialized view ads.audience2;
119119
(0 rows modified)
120120

121121
!update
122122

123-
drop materialized view ads.audience2;
123+
drop view ads.audience;
124124
(0 rows modified)
125125

126126
!update

hoptimator-k8s/src/test/resources/k8s-validation.id

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ create or replace materialized view ads.page_views as select first_name, last_na
55
Cannot overwrite physical table PAGE_VIEWS with a view.
66
!error
77

8+
create table ads.page_views ("i" int);
9+
Table PAGE_VIEWS already exists.
10+
!error
11+
812
create or replace materialized view invalid.page_views as select first_name, last_name from profile.members;
913
Schema for INVALID.PAGE_VIEWS not found.
1014
!error
@@ -18,11 +22,23 @@ create materialized view ads.audience as select first_name, last_name from ads.p
1822
View AUDIENCE already exists. Use CREATE OR REPLACE to update.
1923
!error
2024

25+
drop table ads.audience;
26+
AUDIENCE is a materialized view and does not correspond to DROP TABLE
27+
!error
28+
2129
drop materialized view ads.audience;
2230
(0 rows modified)
2331

2432
!update
2533

34+
drop materialized view ads.audience;
35+
Element AUDIENCE not found
36+
!error
37+
38+
drop materialized view ads.page_views;
39+
PAGE_VIEWS is a table and does not correspond to DROP MATERIALIZED VIEW
40+
!error
41+
2642
create or replace materialized view ads."PAGE_VIEWS$myview" as select first_name from profile.members;
2743
Field FIRST_NAME not found in sink schema
2844
!error

0 commit comments

Comments
 (0)