Skip to content

Commit 84c65e3

Browse files
Add PAUSE/RESUME/DROP TRIGGER support for TableTriggers (#186)
* Add PAUSE/RESUME TRIGGER support for TableTriggers * Add DROP TRIGGER Support * Spotbug fix * Fix test setup --------- Co-authored-by: Shrinand Thakkar <[email protected]>
1 parent 1ea853f commit 84c65e3

File tree

14 files changed

+383
-38
lines changed

14 files changed

+383
-38
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ deploy-demo: deploy
4242
kubectl apply -f ./deploy/samples/demodb.yaml
4343
kubectl apply -f ./deploy/samples/tabletriggers.yaml
4444
kubectl apply -f ./deploy/samples/crontrigger.yaml
45+
kubectl apply -f ./deploy/samples/user-jobs.yaml
4546

4647
undeploy-demo: undeploy
4748
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"

hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
public class Trigger implements Deployable {
88

9+
public static final String PAUSED_OPTION = "paused";
910
private final String name;
1011
private final UserJob job;
1112
private final List<String> path;

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
3333
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
3434
import com.linkedin.hoptimator.util.ArrayTable;
35+
import com.linkedin.hoptimator.jdbc.ddl.SqlDropTrigger;
36+
import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger;
37+
import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger;
3538
import com.linkedin.hoptimator.util.DeploymentService;
3639
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
3740
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable;
@@ -41,6 +44,7 @@
4144
import java.util.ArrayList;
4245
import java.util.Collection;
4346
import java.util.Collections;
47+
import java.util.HashMap;
4448
import java.util.List;
4549
import java.util.Map;
4650
import java.util.Properties;
@@ -61,6 +65,7 @@
6165
import org.apache.calcite.schema.impl.ViewTable;
6266
import org.apache.calcite.server.DdlExecutor;
6367
import org.apache.calcite.server.ServerDdlExecutor;
68+
import org.apache.calcite.sql.SqlIdentifier;
6469
import org.apache.calcite.sql.SqlKind;
6570
import org.apache.calcite.sql.SqlLiteral;
6671
import org.apache.calcite.sql.SqlNode;
@@ -490,8 +495,84 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
490495
logger.info("CREATE TABLE {} completed", tableName);
491496
}
492497

493-
// N.B. originally copy-pasted from Apache Calcite
498+
/** Executes a {@code PAUSE TRIGGER} command. */
499+
public void execute(SqlPauseTrigger pause, CalcitePrepare.Context context) {
500+
updateTriggerPausedState(pause, pause.name, true);
501+
}
502+
503+
/** Executes a {@code RESUME TRIGGER} command. */
504+
public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) {
505+
updateTriggerPausedState(resume, resume.name, false);
506+
}
507+
508+
/** Executes a {@code DROP TRIGGER} command. */
509+
public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
510+
logger.info("Validating statement: {}", drop);
511+
try {
512+
ValidationService.validateOrThrow(drop);
513+
} catch (SQLException e) {
514+
throw new DdlException(drop, e.getMessage(), e);
515+
}
516+
517+
if (drop.name.names.size() > 1) {
518+
throw new DdlException(drop, "Triggers cannot belong to a schema or database.");
519+
}
520+
String name = drop.name.names.get(0);
521+
522+
Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, new HashMap<>());
523+
524+
Collection<Deployer> deployers = null;
525+
try {
526+
logger.info("Deleting trigger {}", name);
527+
deployers = DeploymentService.deployers(trigger, connection);
528+
DeploymentService.delete(deployers);
529+
logger.info("Deleted trigger {}", name);
530+
logger.info("DROP TRIGGER {} completed", name);
531+
} catch (Exception e) {
532+
if (deployers != null) {
533+
DeploymentService.restore(deployers);
534+
}
535+
// Handle IF EXISTS
536+
if (drop.ifExists && e.getMessage() != null && e.getMessage().contains("Error getting TableTrigger")) {
537+
logger.info("Trigger {} does not exist (IF EXISTS specified)", name);
538+
return;
539+
}
540+
throw new DdlException(drop, e.getMessage(), e);
541+
}
542+
}
543+
544+
private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName, boolean paused) {
545+
logger.info("Validating statement: {}", sqlNode);
546+
try {
547+
ValidationService.validateOrThrow(sqlNode);
548+
} catch (SQLException e) {
549+
throw new DdlException(sqlNode, e.getMessage(), e);
550+
}
551+
552+
if (triggerName.names.size() > 1) {
553+
throw new DdlException(sqlNode, "Triggers cannot belong to a schema or database.");
554+
}
555+
String name = triggerName.names.get(0);
556+
557+
Map<String, String> options = new HashMap<>();
558+
options.put(Trigger.PAUSED_OPTION, String.valueOf(paused));
559+
Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, options);
560+
561+
Collection<Deployer> deployers = null;
562+
try {
563+
logger.info("Updating trigger {} with paused state: {}", name, paused);
564+
deployers = DeploymentService.deployers(trigger, connection);
565+
DeploymentService.update(deployers);
566+
logger.info("Successfully updated trigger {} with paused state: {}", name, paused);
567+
} catch (Exception e) {
568+
if (deployers != null) {
569+
DeploymentService.restore(deployers);
570+
}
571+
throw new DdlException(sqlNode, e.getMessage(), e);
572+
}
573+
}
494574

575+
// N.B. largely copy-pasted from Apache Calcite
495576
/** Executes {@code DROP FUNCTION}, {@code DROP TABLE}, {@code DROP MATERIALIZED VIEW}, {@code DROP TYPE},
496577
* {@code DROP VIEW} commands. */
497578
@Override

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTriggerTable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ public static class Row {
1515
public String NAME;
1616
public String SCHEMA;
1717
public String TABLE;
18+
public Boolean PAUSED;
1819
public String TIMESTAMP;
1920
public String WATERMARK;
2021

21-
public Row(String name, String schema, String table, String timestamp, String watermark) {
22+
public Row(String name, String schema, String table, Boolean paused, String timestamp, String watermark) {
2223
this.NAME = name;
2324
this.SCHEMA = schema;
2425
this.TABLE = table;
26+
this.PAUSED = paused;
2527
this.TIMESTAMP = timestamp;
2628
this.WATERMARK = watermark;
2729
}
@@ -35,6 +37,7 @@ public K8sTableTriggerTable(K8sContext context) {
3537
@Override
3638
public Row toRow(V1alpha1TableTrigger obj) {
3739
return new Row(obj.getMetadata().getName(), obj.getSpec().getSchema(), obj.getSpec().getTable(),
40+
obj.getSpec().getPaused(),
3841
Optional.ofNullable(obj.getStatus())
3942
.flatMap(x -> Optional.ofNullable(x.getTimestamp()))
4043
.map(x -> x.toString()).orElse(null),

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,50 @@ class K8sTriggerDeployer extends K8sDeployer<V1alpha1TableTrigger, V1alpha1Table
2020

2121
private final K8sContext context;
2222
private final Trigger trigger;
23+
private final K8sApi<V1alpha1TableTrigger, V1alpha1TableTriggerList> triggerApi;
2324
private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;
2425

2526
K8sTriggerDeployer(Trigger trigger, K8sContext context) {
2627
super(context, K8sApiEndpoints.TABLE_TRIGGERS);
2728
this.context = context;
2829
this.trigger = trigger;
30+
this.triggerApi = new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS);
2931
this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES);
3032
}
3133

34+
@Override
35+
public void update() throws SQLException {
36+
if (trigger.options().containsKey(Trigger.PAUSED_OPTION)) {
37+
String pauseValue = trigger.options().get(Trigger.PAUSED_OPTION);
38+
String canonicalName = K8sUtils.canonicalizeName(trigger.name());
39+
V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName);
40+
41+
if (existingTrigger == null) {
42+
throw new SQLException("Trigger " + trigger.name() + " not found.");
43+
}
44+
45+
V1alpha1TableTriggerSpec spec = existingTrigger.getSpec();
46+
if (spec == null) {
47+
spec = new V1alpha1TableTriggerSpec();
48+
existingTrigger.spec(spec);
49+
}
50+
spec.setPaused("true".equals(pauseValue));
51+
triggerApi.update(existingTrigger);
52+
return;
53+
}
54+
super.update();
55+
}
56+
57+
@Override
58+
public void delete() throws SQLException {
59+
String canonicalName = K8sUtils.canonicalizeName(trigger.name());
60+
V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName);
61+
if (existingTrigger == null) {
62+
throw new SQLException("Trigger " + trigger.name() + " not found.");
63+
}
64+
triggerApi.delete(existingTrigger);
65+
}
66+
3267
@Override
3368
protected V1alpha1TableTrigger toK8sObject() throws SQLException {
3469
String name = K8sUtils.canonicalizeName(trigger.name(), trigger.job().name());

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTrigger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Trigger for a specific table.
3232
*/
3333
@ApiModel(description = "Trigger for a specific table.")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
3535
public class V1alpha1TableTrigger implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* TableTriggerList is a list of TableTrigger
3333
*/
3434
@ApiModel(description = "TableTriggerList is a list of TableTrigger")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
3636
public class V1alpha1TableTriggerList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerSpec.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@
3131
* TableTrigger spec.
3232
*/
3333
@ApiModel(description = "TableTrigger spec.")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
3535
public class V1alpha1TableTriggerSpec {
3636
public static final String SERIALIZED_NAME_JOB_PROPERTIES = "jobProperties";
3737
@SerializedName(SERIALIZED_NAME_JOB_PROPERTIES)
3838
private Map<String, String> jobProperties = null;
3939

40+
public static final String SERIALIZED_NAME_PAUSED = "paused";
41+
@SerializedName(SERIALIZED_NAME_PAUSED)
42+
private Boolean paused;
43+
4044
public static final String SERIALIZED_NAME_SCHEDULE = "schedule";
4145
@SerializedName(SERIALIZED_NAME_SCHEDULE)
4246
private String schedule;
@@ -85,6 +89,29 @@ public void setJobProperties(Map<String, String> jobProperties) {
8589
}
8690

8791

92+
public V1alpha1TableTriggerSpec paused(Boolean paused) {
93+
94+
this.paused = paused;
95+
return this;
96+
}
97+
98+
/**
99+
* Whether the trigger is paused.
100+
* @return paused
101+
**/
102+
@javax.annotation.Nullable
103+
@ApiModelProperty(value = "Whether the trigger is paused.")
104+
105+
public Boolean getPaused() {
106+
return paused;
107+
}
108+
109+
110+
public void setPaused(Boolean paused) {
111+
this.paused = paused;
112+
}
113+
114+
88115
public V1alpha1TableTriggerSpec schedule(String schedule) {
89116

90117
this.schedule = schedule;
@@ -185,6 +212,7 @@ public boolean equals(Object o) {
185212
}
186213
V1alpha1TableTriggerSpec v1alpha1TableTriggerSpec = (V1alpha1TableTriggerSpec) o;
187214
return Objects.equals(this.jobProperties, v1alpha1TableTriggerSpec.jobProperties) &&
215+
Objects.equals(this.paused, v1alpha1TableTriggerSpec.paused) &&
188216
Objects.equals(this.schedule, v1alpha1TableTriggerSpec.schedule) &&
189217
Objects.equals(this.schema, v1alpha1TableTriggerSpec.schema) &&
190218
Objects.equals(this.table, v1alpha1TableTriggerSpec.table) &&
@@ -193,7 +221,7 @@ public boolean equals(Object o) {
193221

194222
@Override
195223
public int hashCode() {
196-
return Objects.hash(jobProperties, schedule, schema, table, yaml);
224+
return Objects.hash(jobProperties, paused, schedule, schema, table, yaml);
197225
}
198226

199227

@@ -202,6 +230,7 @@ public String toString() {
202230
StringBuilder sb = new StringBuilder();
203231
sb.append("class V1alpha1TableTriggerSpec {\n");
204232
sb.append(" jobProperties: ").append(toIndentedString(jobProperties)).append("\n");
233+
sb.append(" paused: ").append(toIndentedString(paused)).append("\n");
205234
sb.append(" schedule: ").append(toIndentedString(schedule)).append("\n");
206235
sb.append(" schema: ").append(toIndentedString(schema)).append("\n");
207236
sb.append(" table: ").append(toIndentedString(table)).append("\n");

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* TableTrigger status.
3030
*/
3131
@ApiModel(description = "TableTrigger status.")
32-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
32+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
3333
public class V1alpha1TableTriggerStatus {
3434
public static final String SERIALIZED_NAME_TIMESTAMP = "timestamp";
3535
@SerializedName(SERIALIZED_NAME_TIMESTAMP)

hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ spec:
4747
schedule:
4848
description: Cron schedule, e.g. "@hourly", which causes the trigger to fire on a schedule.
4949
type: string
50+
paused:
51+
description: Whether the trigger is paused.
52+
type: boolean
5053
required:
5154
- schema
5255
- table
@@ -65,6 +68,10 @@ spec:
6568
subresources:
6669
status: {}
6770
additionalPrinterColumns:
71+
- name: PAUSED
72+
type: boolean
73+
description: Whether trigger is paused.
74+
jsonPath: .spec.paused
6875
- name: SCHEMA
6976
type: string
7077
description: Schema name.

0 commit comments

Comments
 (0)