Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #206 from Netflix/archive
Browse files Browse the repository at this point in the history
Archive Workflows
  • Loading branch information
v1r3n authored May 23, 2017
2 parents 4468667 + 611555b commit 495b341
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 48 deletions.
14 changes: 11 additions & 3 deletions core/src/main/java/com/netflix/conductor/dao/IndexDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ public interface IndexDAO {
/**
* Updates the index
* @param workflowInstanceId id of the workflow
* @param key key to be updated
* @param value value
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
*/
public void update(String workflowInstanceId, String key, Object value);
public void update(String workflowInstanceId, String[] keys, Object[] values);

/**
* Retrieves a specific field from the index
* @param workflowInstanceId id of the workflow
* @param key field to be retrieved
* @return value of the field as string
*/
public String get(String workflowInstanceId, String key);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
public class RedisExecutionDAO extends BaseDynoDAO implements ExecutionDAO {


private static final String ARCHIVED_FIELD = "archived";
private static final String RAW_JSON_FIELD = "rawJSON";
// Keys Families
private static final String TASK_LIMIT_BUCKET = "TASK_LIMIT_BUCKET";
private final static String IN_PROGRESS_TASKS = "IN_PROGRESS_TASKS";
Expand Down Expand Up @@ -227,6 +229,10 @@ public void addTaskExecLog(TaskExecLog log) {
public void removeTask(String taskId) {

Task task = getTask(taskId);
if(task == null) {
logger.warn("No such Task by id {}", taskId);
return;
}
String taskKey = task.getReferenceTaskName() + "" + task.getRetryCount();

dynoClient.hdel(nsKey(SCHEDULED_TASKS, task.getWorkflowInstanceId()), taskKey);
Expand Down Expand Up @@ -300,21 +306,28 @@ public String updateWorkflow(Workflow workflow) {
@Override
public void removeWorkflow(String workflowId) {


Workflow wf = getWorkflow(workflowId, false);
// Remove from lists
String key = nsKey(WORKFLOW_DEF_TO_WORKFLOWS, wf.getWorkflowType(), dateStr(wf.getCreateTime()));
dynoClient.srem(key, workflowId);
dynoClient.srem(nsKey(CORR_ID_TO_WORKFLOWS, wf.getCorrelationId()), workflowId);
dynoClient.srem(nsKey(PENDING_WORKFLOWS, wf.getWorkflowType()), workflowId);

// Remove the object
dynoClient.del(nsKey(WORKFLOW, workflowId));
for(Task task : wf.getTasks()) {
removeTask(task.getTaskId());
}
indexer.remove(workflowId);
try {

Workflow wf = getWorkflow(workflowId, true);

//Add to elasticsearch
indexer.update(workflowId, new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD}, new Object[]{om.writeValueAsString(wf), true});

// Remove from lists
String key = nsKey(WORKFLOW_DEF_TO_WORKFLOWS, wf.getWorkflowType(), dateStr(wf.getCreateTime()));
dynoClient.srem(key, workflowId);
dynoClient.srem(nsKey(CORR_ID_TO_WORKFLOWS, wf.getCorrelationId()), workflowId);
dynoClient.srem(nsKey(PENDING_WORKFLOWS, wf.getWorkflowType()), workflowId);

// Remove the object
dynoClient.del(nsKey(WORKFLOW, workflowId));
for(Task task : wf.getTasks()) {
removeTask(task.getTaskId());
}

}catch(Exception e) {
throw new ApplicationException(e.getMessage(), e);
}
}

@Override
Expand All @@ -328,23 +341,28 @@ public Workflow getWorkflow(String workflowId) {
}

@Override
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Preconditions.checkNotNull(workflowId, "workflowId name cannot be null");


public Workflow getWorkflow(String workflowId, boolean includeTasks) {
String json = dynoClient.get(nsKey(WORKFLOW, workflowId));
if(json != null) {
Workflow workflow = readValue(json, Workflow.class);
if (includeTasks) {
List<Task> tasks = getTasksForWorkflow(workflowId);
tasks.sort(Comparator.comparingLong(Task::getScheduledTime).thenComparingInt(Task::getSeq));
workflow.setTasks(tasks);
}
return workflow;
}

//try from the archive
json = indexer.get(workflowId, RAW_JSON_FIELD);
if (json == null) {
throw new ApplicationException(Code.NOT_FOUND, "No such workflow found by id: " + workflowId);
}
Workflow workflow = readValue(json, Workflow.class);
if (includeTasks) {
List<Task> tasks = getTasksForWorkflow(workflowId);
tasks.sort(Comparator.comparingLong(Task::getScheduledTime).thenComparingInt(Task::getSeq));
workflow.setTasks(tasks);
if(!includeTasks) {
workflow.getTasks().clear();
}
return workflow;


}

@Override
Expand Down Expand Up @@ -541,5 +559,7 @@ public List<EventExecution> getEventExecutions(String eventHandlerName, String e
public void addMessage(String queue, Message msg) {
indexer.addMessage(queue, msg);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -349,19 +354,40 @@ public void remove(String workflowId) {
}

@Override
public void update(String workflowInstanceId, String key, Object value) {
try {
log.info("updating {} with {} and {}", workflowInstanceId, key, value);
UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId);
Map<String, Object> source = new HashMap<>();
public void update(String workflowInstanceId, String[] keys, Object[] values) {
if(keys.length != values.length) {
throw new IllegalArgumentException("Number of keys and values should be same.");
}

UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId);
Map<String, Object> source = new HashMap<>();

for (int i = 0; i < keys.length; i++) {
String key = keys[i];
Object value= values[i];
log.debug("updating {} with {} and {}", workflowInstanceId, key, value);
source.put(key, value);
request.doc(source);
client.update(request).actionGet();

} catch(Throwable e) {
log.error("Index update failed {}", e.getMessage(), e);
Monitors.error(className, "update");
}
request.doc(source);
ActionFuture<UpdateResponse> response = client.update(request);
response.actionGet();
}

@Override
public String get(String workflowInstanceId, String fieldToGet) {
Object value = null;
GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId).fields(fieldToGet);
GetResponse response = client.get(request).actionGet();
Map<String, GetField> fields = response.getFields();
if(fields == null) {
return null;
}
GetField field = fields.get(fieldToGet);
if(field != null) value = field.getValue();
if(value != null) {
return value.toString();
}
return null;
}

private SearchResult<String> search(String structuredQuery, int start, int size, List<String> sortOptions, String freeTextQuery) throws ParserException {
Expand Down Expand Up @@ -396,5 +422,4 @@ private SearchResult<String> search(String structuredQuery, int start, int size,
return new SearchResult<String>(count, result);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.config.TestConfiguration;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.index.ElasticSearchDAO;
import com.netflix.conductor.dao.redis.JedisMock;

Expand Down Expand Up @@ -447,12 +446,6 @@ public void test() throws Exception {
count = dao.getPendingWorkflowCount(workflowName);
assertEquals(0, count);

dao.removeWorkflow(workflowId);
expected.expect(ApplicationException.class);
expected.expectMessage("No such workflow found");
found = dao.getWorkflow(workflowId);


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void remove(String workflowId) {
}

@Override
public void update(String workflowInstanceId, String key, Object value) {
public void update(String workflowInstanceId, String[] key, Object[] value) {

}
@Override
Expand All @@ -70,5 +70,10 @@ public void add(EventExecution ee) {
public void addMessage(String queue, Message msg) {

}

@Override
public String get(String workflowInstanceId, String key) {
return null;
}

}

0 comments on commit 495b341

Please sign in to comment.