Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #345 from Netflix/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
v1r3n authored Oct 12, 2017
2 parents 22f9f91 + 1b1844b commit ff3298b
Show file tree
Hide file tree
Showing 32 changed files with 2,860 additions and 139 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ ui/.settings
.settings
dump.rdb
.idea
out/
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void retryLastFailedTask(String workflowId) {
postForEntity1("workflow/{workflowId}/retry", workflowId);
}

public void resetCallbacksForInProgressTasks(String workflowId) {
postForEntity1("workflow/{workflowId}//{workflowId}/resetcallbacks", workflowId);
}

public void terminateWorkflow(String workflowId, String reason) {
delete(new Object[]{"reason", reason}, "workflow/{workflowId}", workflowId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ public String startWorkflow(String name, int version, Map<String, Object> input,
}
}

public String resetCallbacksForInProgressTasks(String workflowId) throws Exception {
Workflow workflow = edao.getWorkflow(workflowId, true);
if (workflow.getStatus().isTerminal()) {
throw new ApplicationException(Code.CONFLICT, "Workflow is completed. status=" + workflow.getStatus());
}

// Get tasks that are in progress and have callbackAfterSeconds > 0
// and set the callbackAfterSeconds to 0;
for(Task t: workflow.getTasks()) {
if(t.getStatus().equals(Status.IN_PROGRESS) &&
t.getCallbackAfterSeconds() > 0){
if(queue.setOffsetTime(QueueUtils.getQueueName(t), t.getTaskId(), 0)){
t.setCallbackAfterSeconds(0);
edao.updateTask(t);
}
}
};
return workflowId;
}

public String rerun(RerunWorkflowRequest request) throws Exception {
Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing");
if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(),
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,14 @@ public default void processUnacks(String queueName) {

}

/**
* Sets the offset time without pulling out the message from the queue
* @param queueName name of the queue
* @param id message id
* @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues)
* @return true if the message is in queue and the change was successful else returns false
*/
public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond);


}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ public void restart(@PathParam("workflowId") String workflowId) throws Exception
public void retry(@PathParam("workflowId") String workflowId) throws Exception {
executor.retry(workflowId);
}

@POST
@Path("/{workflowId}/resetcallbacks")
@ApiOperation("Resets callback times of all in_progress tasks to 0")
@Consumes(MediaType.WILDCARD)
public void reset(@PathParam("workflowId") String workflowId) throws Exception {
executor.resetCallbacksForInProgressTasks(workflowId);
}

@DELETE
@Path("/{workflowId}")
Expand Down
1 change: 1 addition & 0 deletions mysql-persistence/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
out/
20 changes: 20 additions & 0 deletions mysql-persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
dependencies {
compile project(':conductor-core')
compile 'com.google.inject:guice:3.0'
compile 'org.elasticsearch:elasticsearch:2.+'

compile 'org.sql2o:sql2o:1.5.4'
compile 'commons-io:commons-io:2.4+'
compile 'mysql:mysql-connector-java:5.1.43'
compile 'com.zaxxer:HikariCP:2.6.3'
compile 'org.flywaydb:flyway-core:4.2.0'

testCompile 'ch.vorburger.mariaDB4j:mariaDB4j:2.2.3'
testCompile 'ch.qos.logback:logback-core:1.2.3'
testCompile 'ch.qos.logback:logback-classic:1.2.3'
}

test {
//the MySQL unit tests must run within the same JVM to share the same embedded DB
maxParallelForks = 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.netflix.conductor.dao.mysql;

import static java.sql.Connection.TRANSACTION_READ_COMMITTED;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sql2o.Connection;
import org.sql2o.Sql2o;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

abstract class MySQLBaseDAO {

private static final List<String> EXCLUDED_STACKTRACE_CLASS = ImmutableList.of("com.netflix.conductor.dao.mysql.MySQLBaseDAO", "java.lang.Thread");

protected final Sql2o sql2o;
protected final ObjectMapper om;
protected final Logger logger = LoggerFactory.getLogger(getClass());

protected MySQLBaseDAO(ObjectMapper om, Sql2o sql2o) {
this.om = om;
this.sql2o = sql2o;
}

protected String toJson(Object value) {
try {
return om.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

protected <T>T readValue(String json, Class<T> clazz) {
try {
return om.readValue(json, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected <R> R getWithTransaction(Function<Connection, R> function) {
Instant start = Instant.now();
StackTraceElement caller = Arrays.stream(Thread.currentThread().getStackTrace()).filter(ste -> !EXCLUDED_STACKTRACE_CLASS.contains(ste.getClassName())).findFirst().get();
logger.debug("{} : starting transaction", caller.getMethodName());
try (Connection connection = sql2o.beginTransaction(TRANSACTION_READ_COMMITTED)) {
final R result = function.apply(connection);
connection.commit();
return result;
} finally {
Instant end = Instant.now();
logger.debug("{} : took {}ms", caller.getMethodName(), Duration.between(start, end).toMillis());
}
}

protected void withTransaction(Consumer<Connection> consumer) {
getWithTransaction(connection -> {
consumer.accept(connection);
return null;
});
}

/**
* This will inject a series of p1, p2, ... placeholders in the given query template so it can then be used
* in conjunction with the withParams method on the Sql2o Query object.
*
* The withParams method in the Query class loops through each element in the given array and adds a prepared statement for each.
* For each element found in the array, a pN placeholder should exists in the query.
*
* This is useful for generating the IN clause since Sql2o does not support passing directly a list
*
* @param queryTemplate a query template with a %s placeholder where the variable size parameters placeholders should be injected
* @param numPlaceholders the number of placeholders to generated
* @return
*/
protected String generateQueryWithParametersListPlaceholders(String queryTemplate, int numPlaceholders) {
String paramsPlaceholders = String.join(",", IntStream.rangeClosed(1, numPlaceholders).mapToObj(paramNumber -> ":p" + paramNumber).collect(Collectors.toList()));
return String.format(queryTemplate, paramsPlaceholders);
}
}
Loading

0 comments on commit ff3298b

Please sign in to comment.