From 18b12c8a1d4c5c3e921c56deaf2f08d1b215501a Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 22 May 2017 16:36:11 -0700 Subject: [PATCH 1/2] archival support --- .../com/netflix/conductor/dao/IndexDAO.java | 14 +++- .../dao/dynomite/RedisExecutionDAO.java | 64 ++++++++++++------- .../conductor/dao/index/ElasticSearchDAO.java | 49 ++++++++++---- .../dao/dynomite/RedisExecutionDAOTest.java | 9 +-- .../conductor/tests/utils/MockIndexDAO.java | 7 +- 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index 53436f37a2..facf5142e0 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -63,10 +63,18 @@ public interface IndexDAO { /** * Updates the index * @param workflowInstanceId id of the workflow - * @param key key to be updated - * @param value value + * @param keys keys to be updated + * @param values values. Number of keys and values MUST match. */ - public void update(String workflowInstanceId, String key, Object value); + public void update(String workflowInstanceId, String[] keys, Object[] values); + + /** + * Retrieves a specific field from the index + * @param workflowInstanceId id of the workflow + * @param key field to be retrieved + * @return value of the field as string + */ + public String get(String workflowInstanceId, String key); /** * diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java index 57c402a23f..fba61445c0 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java @@ -51,6 +51,8 @@ public class RedisExecutionDAO extends BaseDynoDAO implements ExecutionDAO { + private static final String ARCHIVED_FIELD = "archived"; + private static final String RAW_JSON_FIELD = "rawJSON"; // Keys Families private static final String TASK_LIMIT_BUCKET = "TASK_LIMIT_BUCKET"; private final static String IN_PROGRESS_TASKS = "IN_PROGRESS_TASKS"; @@ -300,21 +302,28 @@ public String updateWorkflow(Workflow workflow) { @Override public void removeWorkflow(String workflowId) { - - Workflow wf = getWorkflow(workflowId, false); - // Remove from lists - String key = nsKey(WORKFLOW_DEF_TO_WORKFLOWS, wf.getWorkflowType(), dateStr(wf.getCreateTime())); - dynoClient.srem(key, workflowId); - dynoClient.srem(nsKey(CORR_ID_TO_WORKFLOWS, wf.getCorrelationId()), workflowId); - dynoClient.srem(nsKey(PENDING_WORKFLOWS, wf.getWorkflowType()), workflowId); - - // Remove the object - dynoClient.del(nsKey(WORKFLOW, workflowId)); - for(Task task : wf.getTasks()) { - removeTask(task.getTaskId()); - } - indexer.remove(workflowId); + try { + + Workflow wf = getWorkflow(workflowId, true); + + //Add to elasticsearch + indexer.update(workflowId, new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD}, new Object[]{om.writeValueAsString(wf), true}); + + // Remove from lists + String key = nsKey(WORKFLOW_DEF_TO_WORKFLOWS, wf.getWorkflowType(), dateStr(wf.getCreateTime())); + dynoClient.srem(key, workflowId); + dynoClient.srem(nsKey(CORR_ID_TO_WORKFLOWS, wf.getCorrelationId()), workflowId); + dynoClient.srem(nsKey(PENDING_WORKFLOWS, wf.getWorkflowType()), workflowId); + // Remove the object + dynoClient.del(nsKey(WORKFLOW, workflowId)); + for(Task task : wf.getTasks()) { + removeTask(task.getTaskId()); + } + + }catch(Exception e) { + throw new ApplicationException(e.getMessage(), e); + } } @Override @@ -328,23 +337,28 @@ public Workflow getWorkflow(String workflowId) { } @Override - public Workflow getWorkflow(String workflowId, boolean includeTasks) { - Preconditions.checkNotNull(workflowId, "workflowId name cannot be null"); - - + public Workflow getWorkflow(String workflowId, boolean includeTasks) { String json = dynoClient.get(nsKey(WORKFLOW, workflowId)); + if(json != null) { + Workflow workflow = readValue(json, Workflow.class); + if (includeTasks) { + List tasks = getTasksForWorkflow(workflowId); + tasks.sort(Comparator.comparingLong(Task::getScheduledTime).thenComparingInt(Task::getSeq)); + workflow.setTasks(tasks); + } + return workflow; + } + + //try from the archive + json = indexer.get(workflowId, RAW_JSON_FIELD); if (json == null) { throw new ApplicationException(Code.NOT_FOUND, "No such workflow found by id: " + workflowId); } Workflow workflow = readValue(json, Workflow.class); - if (includeTasks) { - List tasks = getTasksForWorkflow(workflowId); - tasks.sort(Comparator.comparingLong(Task::getScheduledTime).thenComparingInt(Task::getSeq)); - workflow.setTasks(tasks); + if(!includeTasks) { + workflow.getTasks().clear(); } return workflow; - - } @Override @@ -541,5 +555,7 @@ public List getEventExecutions(String eventHandlerName, String e public void addMessage(String queue, Message msg) { indexer.addMessage(queue, msg); } + + } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java index 6a2ac30ac2..477aff822b 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java @@ -34,16 +34,21 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -349,19 +354,40 @@ public void remove(String workflowId) { } @Override - public void update(String workflowInstanceId, String key, Object value) { - try { - log.info("updating {} with {} and {}", workflowInstanceId, key, value); - UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); - Map source = new HashMap<>(); + public void update(String workflowInstanceId, String[] keys, Object[] values) { + if(keys.length != values.length) { + throw new IllegalArgumentException("Number of keys and values should be same."); + } + + UpdateRequest request = new UpdateRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId); + Map source = new HashMap<>(); + + for (int i = 0; i < keys.length; i++) { + String key = keys[i]; + Object value= values[i]; + log.debug("updating {} with {} and {}", workflowInstanceId, key, value); source.put(key, value); - request.doc(source); - client.update(request).actionGet(); - - } catch(Throwable e) { - log.error("Index update failed {}", e.getMessage(), e); - Monitors.error(className, "update"); } + request.doc(source); + ActionFuture response = client.update(request); + response.actionGet(); + } + + @Override + public String get(String workflowInstanceId, String fieldToGet) { + Object value = null; + GetRequest request = new GetRequest(indexName, WORKFLOW_DOC_TYPE, workflowInstanceId).fields(fieldToGet); + GetResponse response = client.get(request).actionGet(); + Map fields = response.getFields(); + if(fields == null) { + return null; + } + GetField field = fields.get(fieldToGet); + if(field != null) value = field.getValue(); + if(value != null) { + return value.toString(); + } + return null; } private SearchResult search(String structuredQuery, int start, int size, List sortOptions, String freeTextQuery) throws ParserException { @@ -396,5 +422,4 @@ private SearchResult search(String structuredQuery, int start, int size, return new SearchResult(count, result); } - } diff --git a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java index af6d360890..e1382dc2cf 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java @@ -51,13 +51,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.Workflow.WorkflowStatus; import com.netflix.conductor.config.TestConfiguration; import com.netflix.conductor.core.config.Configuration; -import com.netflix.conductor.core.execution.ApplicationException; import com.netflix.conductor.dao.index.ElasticSearchDAO; import com.netflix.conductor.dao.redis.JedisMock; @@ -447,12 +446,6 @@ public void test() throws Exception { count = dao.getPendingWorkflowCount(workflowName); assertEquals(0, count); - dao.removeWorkflow(workflowId); - expected.expect(ApplicationException.class); - expected.expectMessage("No such workflow found"); - found = dao.getWorkflow(workflowId); - - } } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java index b0aa0794de..d9f75ee3a4 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java @@ -49,7 +49,7 @@ public void remove(String workflowId) { } @Override - public void update(String workflowInstanceId, String key, Object value) { + public void update(String workflowInstanceId, String[] key, Object[] value) { } @Override @@ -70,5 +70,10 @@ public void add(EventExecution ee) { public void addMessage(String queue, Message msg) { } + + @Override + public String get(String workflowInstanceId, String key) { + return null; + } } From 611555b8496b0f8b37f89a00feb9ef178d420f25 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Mon, 22 May 2017 17:49:05 -0700 Subject: [PATCH 2/2] logging --- .../com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java index fba61445c0..fb5fb2998a 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java @@ -229,6 +229,10 @@ public void addTaskExecLog(TaskExecLog log) { public void removeTask(String taskId) { Task task = getTask(taskId); + if(task == null) { + logger.warn("No such Task by id {}", taskId); + return; + } String taskKey = task.getReferenceTaskName() + "" + task.getRetryCount(); dynoClient.hdel(nsKey(SCHEDULED_TASKS, task.getWorkflowInstanceId()), taskKey);