Skip to content

Commit 7eb6215

Browse files
authored
Fix nullness in some of Beam SQL (#36890)
1 parent 99100aa commit 7eb6215

File tree

15 files changed

+150
-109
lines changed

15 files changed

+150
-109
lines changed
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
{
2-
"comment": "Modify this file in a trivial way to cause this test suite to run ",
3-
"modification": 1
2+
"https://github.com/apache/beam/pull/36890": "fixing some null errors"
43
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.impl;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
2022
import java.util.Collection;
2123
import java.util.Collections;
2224
import java.util.HashMap;
@@ -40,7 +42,6 @@
4042
* org.apache.beam.sdk.extensions.sql.meta.store.MetaStore}. In Beam SQL, a DATABASE refers to a
4143
* {@link BeamCalciteSchema}.
4244
*/
43-
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
4445
public class BeamCalciteSchema implements Schema {
4546
private JdbcConnection connection;
4647
private TableProvider tableProvider;
@@ -94,7 +95,9 @@ public Schema snapshot(SchemaVersion version) {
9495
}
9596

9697
@Override
97-
public Expression getExpression(SchemaPlus parentSchema, String name) {
98+
public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
99+
checkArgumentNotNull(
100+
parentSchema, "Cannot convert BeamCalciteSchema to Expression without parent schema");
98101
return Schemas.subSchemaExpression(parentSchema, name, getClass());
99102
}
100103

@@ -114,7 +117,7 @@ public Set<String> getTypeNames() {
114117
}
115118

116119
@Override
117-
public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table getTable(
120+
public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.@Nullable Table getTable(
118121
String name) {
119122
Table table = tableProvider.getTable(name);
120123
if (table == null) {
@@ -148,14 +151,20 @@ public Set<String> getSubSchemaNames() {
148151
* <p>Otherwise, the sub-schema is derived from the {@link TableProvider} implementation.
149152
*/
150153
@Override
151-
public Schema getSubSchema(String name) {
152-
if (!subSchemas.containsKey(name)) {
153-
BeamCalciteSchema subSchema;
154-
@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
155-
subSchema = subProvider != null ? new BeamCalciteSchema(name, connection, subProvider) : null;
156-
subSchemas.put(name, subSchema);
154+
public @Nullable Schema getSubSchema(String name) {
155+
BeamCalciteSchema subSchema = subSchemas.get(name);
156+
157+
if (subSchema != null) {
158+
return subSchema;
159+
}
160+
161+
@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
162+
if (subProvider == null) {
163+
return null;
157164
}
158165

159-
return subSchemas.get(name);
166+
subSchema = new BeamCalciteSchema(name, connection, subProvider);
167+
subSchemas.put(name, subSchema);
168+
return subSchema;
160169
}
161170
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
4141
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
4242
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
43+
import org.checkerframework.checker.nullness.qual.Nullable;
4344

4445
/**
4546
* Factory classes that Calcite uses to create initial schema for JDBC connection.
@@ -57,22 +58,21 @@
5758
* <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
5859
* org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
5960
*/
60-
@SuppressWarnings({
61-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
62-
})
6361
class BeamCalciteSchemaFactory {
6462

6563
/**
6664
* Called by {@link JdbcConnection} when initializing to convert the initial empty schema to
6765
* actual {@link BeamCalciteSchema}.
6866
*/
6967
static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) {
70-
InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
68+
InitialEmptySchema initialEmptySchema =
69+
(InitialEmptySchema) jdbcConnection.getCurrentBeamSchema();
7170
return initialEmptySchema.getTableProvider();
7271
}
7372

7473
static CatalogManager catalogFromInitialEmptySchema(JdbcConnection jdbcConnection) {
75-
InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
74+
InitialEmptySchema initialEmptySchema =
75+
(InitialEmptySchema) jdbcConnection.getCurrentBeamSchema();
7676
return initialEmptySchema.getCatalogManager();
7777
}
7878

@@ -209,7 +209,7 @@ public Set<String> getSubSchemaNames() {
209209
}
210210

211211
@Override
212-
public Expression getExpression(SchemaPlus parentSchema, String name) {
212+
public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
213213
return illegal();
214214
}
215215

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,21 @@
4242
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
4343
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.TranslatableTable;
4444
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
45+
import org.checkerframework.checker.nullness.qual.Nullable;
4546

4647
/** Adapter from {@link BeamSqlTable} to a calcite Table. */
47-
@SuppressWarnings({
48-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
49-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
50-
})
5148
public class BeamCalciteTable extends AbstractQueryableTable
5249
implements ModifiableTable, TranslatableTable {
5350
private final BeamSqlTable beamTable;
5451
// These two options should be unified.
5552
// https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7590
5653
private final Map<String, String> pipelineOptionsMap;
57-
private PipelineOptions pipelineOptions;
54+
private @Nullable PipelineOptions pipelineOptions;
5855

5956
BeamCalciteTable(
6057
BeamSqlTable beamTable,
6158
Map<String, String> pipelineOptionsMap,
62-
PipelineOptions pipelineOptions) {
59+
@Nullable PipelineOptions pipelineOptions) {
6360
super(Object[].class);
6461
this.beamTable = beamTable;
6562
this.pipelineOptionsMap = pipelineOptionsMap;
@@ -117,7 +114,7 @@ public <T> Queryable<T> asQueryable(
117114
}
118115

119116
@Override
120-
public Collection getModifiableCollection() {
117+
public @Nullable Collection getModifiableCollection() {
121118
return null;
122119
}
123120

@@ -128,8 +125,8 @@ public TableModify toModificationRel(
128125
Prepare.CatalogReader catalogReader,
129126
RelNode child,
130127
TableModify.Operation operation,
131-
List<String> updateColumnList,
132-
List<RexNode> sourceExpressionList,
128+
@Nullable List<String> updateColumnList,
129+
@Nullable List<RexNode> sourceExpressionList,
133130
boolean flattened) {
134131
return new BeamIOSinkRel(
135132
cluster,

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.sdk.extensions.sql.impl;
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2221

2322
import java.sql.SQLException;
2423
import java.util.AbstractMap.SimpleEntry;
@@ -51,17 +50,13 @@
5150
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
5251
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
5352
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
54-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
53+
import org.checkerframework.checker.nullness.qual.Nullable;
5554

5655
/**
5756
* Contains the metadata of tables/UDF functions, and exposes APIs to
5857
* query/validate/optimize/translate SQL statements.
5958
*/
6059
@Internal
61-
@SuppressWarnings({
62-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
63-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
64-
})
6560
public class BeamSqlEnv {
6661
JdbcConnection connection;
6762
QueryPlanner planner;
@@ -151,16 +146,14 @@ public static class BeamSqlEnvBuilder {
151146
"org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
152147
private String queryPlannerClassName;
153148
private CatalogManager catalogManager;
154-
private String currentSchemaName;
149+
private @Nullable String currentSchemaName = null;
155150
private Map<String, TableProvider> schemaMap;
156151
private Set<Map.Entry<String, Function>> functionSet;
157152
private boolean autoLoadUdfs;
158-
private PipelineOptions pipelineOptions;
153+
private @Nullable PipelineOptions pipelineOptions;
159154
private Collection<RuleSet> ruleSets;
160155

161156
private BeamSqlEnvBuilder(TableProvider tableProvider) {
162-
checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
163-
164157
if (tableProvider instanceof MetaStore) {
165158
catalogManager = new InMemoryCatalogManager((MetaStore) tableProvider);
166159
} else {
@@ -176,8 +169,6 @@ private BeamSqlEnvBuilder(TableProvider tableProvider) {
176169
}
177170

178171
private BeamSqlEnvBuilder(CatalogManager catalogManager) {
179-
checkNotNull(catalogManager, "Catalog manager for the default schema must be set.");
180-
181172
this.catalogManager = catalogManager;
182173
this.queryPlannerClassName = CALCITE_PLANNER;
183174
this.schemaMap = new HashMap<>();
@@ -287,7 +278,9 @@ private void configureSchemas(JdbcConnection jdbcConnection) {
287278
// Does not update the current default schema.
288279
schemaMap.forEach(jdbcConnection::setSchema);
289280

290-
if (Strings.isNullOrEmpty(currentSchemaName)) {
281+
// Fix it in a local variable so static analysis knows it cannot be mutated.
282+
@Nullable String currentSchemaName = this.currentSchemaName;
283+
if (currentSchemaName == null || currentSchemaName.isEmpty()) {
291284
return;
292285
}
293286

@@ -328,9 +321,18 @@ private QueryPlanner instantiatePlanner(
328321
"Cannot find requested QueryPlanner class: " + queryPlannerClassName, exc);
329322
}
330323

324+
// This try/catch kept deliberately tight to ensure that we _only_ catch exceptions due to
325+
// this reflective access.
331326
QueryPlanner.Factory factory;
332327
try {
333-
factory = (QueryPlanner.Factory) queryPlannerClass.getField("FACTORY").get(null);
328+
// See https://github.com/typetools/jdk/pull/235#pullrequestreview-3400922783
329+
@SuppressWarnings("nullness")
330+
Object queryPlannerFactoryObj =
331+
checkStateNotNull(
332+
queryPlannerClass.getField("FACTORY").get(null),
333+
"Static field %s.FACTORY is null. It must be a QueryPlanner.Factory instance.",
334+
queryPlannerClass);
335+
factory = (QueryPlanner.Factory) queryPlannerFactoryObj;
334336
} catch (NoSuchFieldException | IllegalAccessException exc) {
335337
throw new RuntimeException(
336338
String.format(

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.impl;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
2022
import java.sql.SQLException;
2123
import java.util.Collections;
2224
import java.util.Map;
@@ -27,6 +29,7 @@
2729
import org.apache.beam.sdk.values.KV;
2830
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteConnection;
2931
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
32+
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
3033
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
3134
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3235
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -38,9 +41,6 @@
3841
* {@link BeamCalciteSchema BeamCalciteSchemas} keep reference to this connection. Pipeline options
3942
* are stored here.
4043
*/
41-
@SuppressWarnings({
42-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
43-
})
4444
public class JdbcConnection extends CalciteConnectionWrapper {
4545
/**
4646
* Connection string parameters that begin with {@code "beam."} will be interpreted as {@link
@@ -49,7 +49,7 @@ public class JdbcConnection extends CalciteConnectionWrapper {
4949
private static final String PIPELINE_OPTION_PREFIX = "beam.";
5050

5151
private Map<String, String> pipelineOptionsMap;
52-
private PipelineOptions pipelineOptions;
52+
private @Nullable PipelineOptions pipelineOptions;
5353

5454
private JdbcConnection(CalciteConnection connection) throws SQLException {
5555
super(connection);
@@ -62,16 +62,16 @@ private JdbcConnection(CalciteConnection connection) throws SQLException {
6262
* <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
6363
* created by {@link BeamCalciteSchemaFactory}.
6464
*/
65-
static @Nullable JdbcConnection initialize(CalciteConnection connection) {
65+
static JdbcConnection initialize(CalciteConnection connection) {
6666
try {
67-
if (connection == null) {
68-
return null;
69-
}
67+
String currentSchemaName =
68+
checkStateNotNull(
69+
connection.getSchema(), "When trying to initialize JdbcConnection: No schema set.");
7070

7171
JdbcConnection jdbcConnection = new JdbcConnection(connection);
7272
jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
7373
jdbcConnection.setSchema(
74-
connection.getSchema(),
74+
currentSchemaName,
7575
BeamCalciteSchemaFactory.catalogFromInitialEmptySchema(jdbcConnection));
7676
return jdbcConnection;
7777
} catch (SQLException e) {
@@ -107,27 +107,29 @@ public void setPipelineOptions(PipelineOptions pipelineOptions) {
107107
this.pipelineOptions = pipelineOptions;
108108
}
109109

110-
public PipelineOptions getPipelineOptions() {
110+
public @Nullable PipelineOptions getPipelineOptions() {
111111
return this.pipelineOptions;
112112
}
113113

114114
/** Get the current default schema from the root schema. */
115-
@SuppressWarnings("TypeParameterUnusedInFormals")
116-
<T> T getCurrentBeamSchema() {
117-
try {
118-
return (T) CalciteSchema.from(getRootSchema().getSubSchema(getSchema())).schema;
119-
} catch (SQLException e) {
120-
throw new RuntimeException(e);
121-
}
115+
Schema getCurrentBeamSchema() {
116+
return CalciteSchema.from(getCurrentSchemaPlus()).schema;
122117
}
123118

124119
/** Calcite-created {@link SchemaPlus} wrapper for the current schema. */
125120
public SchemaPlus getCurrentSchemaPlus() {
121+
String currentSchema;
126122
try {
127-
return getRootSchema().getSubSchema(getSchema());
123+
currentSchema = checkStateNotNull(getSchema(), "Current schema not set");
128124
} catch (SQLException e) {
129125
throw new RuntimeException(e);
130126
}
127+
128+
return checkStateNotNull(
129+
getRootSchema().getSubSchema(currentSchema),
130+
"SubSchema not found in `%s`: %s",
131+
getRootSchema().getName(),
132+
currentSchema);
131133
}
132134

133135
/**

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.rules.CoreRules;
4646
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.runtime.Hook;
4747
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
48+
import org.checkerframework.checker.nullness.qual.Nullable;
4849

4950
/**
5051
* Calcite JDBC driver with Beam defaults.
@@ -56,10 +57,6 @@
5657
* <p>The querystring-style parameters are parsed as {@link PipelineOptions}.
5758
*/
5859
@AutoService(java.sql.Driver.class)
59-
@SuppressWarnings({
60-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
61-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
62-
})
6360
public class JdbcDriver extends Driver {
6461
public static final JdbcDriver INSTANCE = new JdbcDriver();
6562
public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
@@ -129,10 +126,18 @@ protected String getConnectStringPrefix() {
129126
* CalciteConnection}.
130127
*/
131128
@Override
132-
public Connection connect(String url, Properties info) throws SQLException {
129+
@SuppressWarnings("override.return") // https://github.com/typetools/jdk/pull/246
130+
public @Nullable Connection connect(String url, Properties info) throws SQLException {
131+
@Nullable CalciteConnection connection = (CalciteConnection) super.connect(url, info);
132+
133+
// null here means that CalciteConnection is not a "suitable driver" based on the parameters
134+
if (connection == null) {
135+
return null;
136+
}
137+
133138
// calciteConnection is initialized with an empty Beam schema,
134139
// we need to populate it with pipeline options, load table providers, etc
135-
return JdbcConnection.initialize((CalciteConnection) super.connect(url, info));
140+
return JdbcConnection.initialize(connection);
136141
}
137142

138143
/**
@@ -176,6 +181,12 @@ private static JdbcConnection getConnection(PipelineOptions options) {
176181
JdbcConnection connection;
177182
try {
178183
connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
184+
// Normally, #connect is allowed to return null when the URL is not suitable. Here, however,
185+
// we are
186+
// deliberately passing a bogus URL to instantiate a connection, so it should never be null.
187+
if (connection == null) {
188+
throw new SQLException("Unexpected null when creating synthetic Beam JdbcDriver");
189+
}
179190
} catch (SQLException e) {
180191
throw new RuntimeException(e);
181192
}

0 commit comments

Comments
 (0)