Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix BETWEEN operator; ensure task cancellation when terminating workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jun 10, 2020
1 parent e4a0c6f commit 9df9e2b
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class MetricsContainer {
private static final String WORFLOW_TYPE = "workflowType";
private static final String WORKFLOW_VERSION = "version";
private static final String EXCEPTION = "exception";
private static final String NAME = "name";
private static final String ENTITY_NAME = "entityName";
private static final String OPERATION = "operation";
private static final String PAYLOAD_TYPE = "payload_type";

Expand Down Expand Up @@ -162,7 +162,7 @@ public static void recordWorkflowInputPayloadSize(String workflowType, String ve
}

public static void incrementExternalPayloadUsedCount(String name, String operation, String payloadType) {
incrementCount(EXTERNAL_PAYLOAD_USED, NAME, name, OPERATION, operation, PAYLOAD_TYPE, payloadType);
incrementCount(EXTERNAL_PAYLOAD_USED, ENTITY_NAME, name, OPERATION, operation, PAYLOAD_TYPE, payloadType);
}

public static void incrementWorkflowStartErrorCount(String workflowType, Throwable t) {
Expand All @@ -177,6 +177,6 @@ public static void incrementWorkflowStartErrorCount(String workflowType, Throwab
* @param className the name of the class which initialized the client
*/
public static void incrementInitializationCount(String className) {
incrementCount(CLIENT_INITIALIZED, NAME, className);
incrementCount(CLIENT_INITIALIZED, ENTITY_NAME, className);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,29 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

List<Task> tasks = workflow.getTasks();
// Remove from the task queue if they were there
tasks.forEach(task -> queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()));

// Update non-terminal tasks' status to CANCELED
for (Task task : tasks) {
if (!task.getStatus().isTerminal()) {
// Cancel the ones which are not completed yet....
task.setStatus(CANCELED);
if (isSystemTask.test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
try {
workflowSystemTask.cancel(workflow, task, this);
} catch (Exception e) {
throw new ApplicationException(Code.INTERNAL_ERROR,
String.format("Error canceling system task: %s/%s", workflowSystemTask.getName(),
task.getTaskId()), e);
}
}
executionDAOFacade.updateTask(task);
}
}

if (!workflow.getStatus().isTerminal()) {
workflow.setStatus(WorkflowStatus.TERMINATED);
}
Expand All @@ -696,29 +719,6 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflow.setReasonForIncompletion(reason);
executionDAOFacade.updateWorkflow(workflow);

List<Task> tasks = workflow.getTasks();
for (Task task : tasks) {
if (!task.getStatus().isTerminal()) {
// Cancel the ones which are not completed yet....
task.setStatus(CANCELED);
if (isSystemTask.test(task)) {
WorkflowSystemTask stt = WorkflowSystemTask.get(task.getTaskType());
try {
stt.cancel(workflow, task, this);
} catch (Exception e) {
throw new ApplicationException(
Code.INTERNAL_ERROR,
String.format("Error canceling systems task: %s", stt.getName()),
e
);
}
}
executionDAOFacade.updateTask(task);
}
// And remove from the task queue if they were there
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
}

// If the following lines, for some reason fails, the sweep will take
// care of this again!
if (workflow.getParentWorkflowId() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ public List<Task> poll(String taskType, String workerId, String domain, int coun
if (task == null || task.getStatus().isTerminal()) {
// Remove taskId(s) without a valid Task/terminal state task from the queue
queueDAO.remove(queueName, taskId);
logger.debug("Removed taskId from the queue: {}, {}", queueName, taskId);
logger.debug("Removed task: {} from the queue: {}", taskId, queueName);
continue;
}

if (executionDAOFacade.exceedsInProgressLimit(task)) {
// Postpone a message, so that it would be available for poll again.
queueDAO.postpone(queueName, taskId, task.getWorkflowPriority(), queueTaskMessagePostponeSeconds);
logger.debug("Postponed task: {} in queue: {} by {} seconds", taskId, queueName, queueTaskMessagePostponeSeconds);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
/**
* Copyright 2016 Netflix, Inc.
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.elasticsearch.query.parser;

Expand All @@ -25,11 +19,11 @@
*/
public class ComparisonOp extends AbstractNode {

public static enum Operators {
public enum Operators {
BETWEEN("BETWEEN"), EQUALS("="), LESS_THAN("<"), GREATER_THAN(">"), IN("IN"), NOT_EQUALS("!="), IS("IS"),
STARTS_WITH("STARTS_WITH");

private String value;
private final String value;
Operators(String value){
this.value = value;
}
Expand All @@ -49,8 +43,7 @@ public String value(){

private static final int maxOperatorLength;

private static final int betwnLen = Operators.BETWEEN.value().length();

private static final int betweenLen = Operators.BETWEEN.value().length();
private static final int startsWithLen = Operators.STARTS_WITH.value().length();

private String value;
Expand All @@ -70,7 +63,7 @@ protected void _parse() throws Exception {
this.value = "IS";
}else if(peeked[0] == '!' && peeked[1] == '='){
this.value = "!=";
}else if(peeked.length == betwnLen && new String(peeked).equals(Operators.BETWEEN.value())) {
}else if(peeked.length >= betweenLen && peeked[0] == 'B' && peeked[1] == 'E' && peeked[2] == 'T' && peeked[3] == 'W' && peeked[4] == 'E' && peeked[5] == 'E' && peeked[6] == 'N'){
this.value = Operators.BETWEEN.value();
}else if(peeked.length == startsWithLen && new String(peeked).equals(Operators.STARTS_WITH.value())) {
this.value = Operators.STARTS_WITH.value();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
/**
* Copyright 2016 Netflix, Inc.
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.elasticsearch.query.parser;


import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.junit.Test;

/**
* @author Viren
*
Expand All @@ -32,7 +25,7 @@ public class TestComparisonOp extends AbstractParserTest {

@Test
public void test() throws Exception {
String[] tests = new String[]{"<",">","=","!=","IN","STARTS_WITH"};
String[] tests = new String[]{"<",">","=","!=","IN","BETWEEN","STARTS_WITH"};
for(String test : tests){
ComparisonOp name = new ComparisonOp(getInputStream(test));
String nameVal = name.getOperator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
/*
* Copyright 2020 Netflix, Inc.
*
* http://www.apache.org/licenses/LICENSE-2.0
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.elasticsearch.query.parser;

Expand All @@ -25,11 +19,11 @@
*/
public class ComparisonOp extends AbstractNode {

public static enum Operators {
public enum Operators {
BETWEEN("BETWEEN"), EQUALS("="), LESS_THAN("<"), GREATER_THAN(">"), IN("IN"), NOT_EQUALS("!="), IS("IS"),
STARTS_WITH("STARTS_WITH");

private String value;
private final String value;
Operators(String value){
this.value = value;
}
Expand All @@ -49,8 +43,7 @@ public String value(){

private static final int maxOperatorLength;

private static final int betwnLen = Operators.BETWEEN.value().length();

private static final int betweenLen = Operators.BETWEEN.value().length();
private static final int startsWithLen = Operators.STARTS_WITH.value().length();

private String value;
Expand All @@ -70,7 +63,7 @@ protected void _parse() throws Exception {
this.value = "IS";
}else if(peeked[0] == '!' && peeked[1] == '='){
this.value = "!=";
}else if(peeked.length == betwnLen && new String(peeked).equals(Operators.BETWEEN.value())){
}else if(peeked.length >= betweenLen && peeked[0] == 'B' && peeked[1] == 'E' && peeked[2] == 'T' && peeked[3] == 'W' && peeked[4] == 'E' && peeked[5] == 'E' && peeked[6] == 'N'){
this.value = Operators.BETWEEN.value();
}else if(peeked.length == startsWithLen && new String(peeked).equals(Operators.STARTS_WITH.value())) {
this.value = Operators.STARTS_WITH.value();
Expand All @@ -89,5 +82,4 @@ public String toString(){
public String getOperator(){
return value;
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
/**
* Copyright 2016 Netflix, Inc.
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.elasticsearch.query.parser;


import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.junit.Test;

/**
* @author Viren
*
Expand All @@ -32,7 +25,7 @@ public class TestComparisonOp extends AbstractParserTest {

@Test
public void test() throws Exception {
String[] tests = new String[]{"<",">","=","!=","IN","STARTS_WITH"};
String[] tests = new String[]{"<",">","=","!=","IN","BETWEEN","STARTS_WITH"};
for(String test : tests){
ComparisonOp name = new ComparisonOp(getInputStream(test));
String nameVal = name.getOperator();
Expand Down

0 comments on commit 9df9e2b

Please sign in to comment.