diff --git a/client/build.gradle b/client/build.gradle index b8c443b358..b2472b2a2a 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -7,4 +7,6 @@ dependencies { compile 'com.netflix.eureka:eureka-client:latest.release' compile 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.7.5' compile 'com.netflix.archaius:archaius-core:0.7.5' + + testCompile 'org.slf4j:slf4j-log4j12:1.8.0-alpha1' } diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index d2cd985089..0a2b804254 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# +# import sys import time import subprocess @@ -37,6 +37,7 @@ def execute(self, task, exec_function): raise Exception('Task execution function MUST return a response as a dict with status and output fields') task['status'] = resp['status'] task['outputData'] = resp['output'] + task['logs'] = resp['logs'] self.taskClient.updateTask(task) except Exception as err: print 'Error executing task: ' + str(err) diff --git a/client/python/kitchensink_workers.py b/client/python/kitchensink_workers.py index c7720dc6e7..007aaae744 100644 --- a/client/python/kitchensink_workers.py +++ b/client/python/kitchensink_workers.py @@ -1,15 +1,15 @@ from conductor.ConductorWorker import ConductorWorker def execute(task): - return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}} + return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}, 'logs': ['one','two']} def execute4(task): forkTasks = [{"name": "task_1", "taskReferenceName": "task_1_1", "type": "SIMPLE"},{"name": "sub_workflow_4", "taskReferenceName": "wf_dyn", "type": "SUB_WORKFLOW", "subWorkflowParam": {"name": "sub_flow_1"}}]; input = {'task_1_1': {}, 'wf_dyn': {}} - return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0, 'dynamicTasks': forkTasks, 'inputs': input}} + return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0, 'dynamicTasks': forkTasks, 'inputs': input}, 'logs': ['one','two']} def main(): - print 'Hello World' + print 'Starting Kitchensink workflows' cc = ConductorWorker('http://localhost:8080/api', 1, 0.1) for x in range(1, 30): if(x == 4): diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index d4f538ce86..d2b61ec3be 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -144,6 +144,10 @@ public void updateTask(TaskResult task) { postForEntity("tasks", task); } + public void log(String taskId, String logMessage) { + postForEntity("tasks/" + taskId + "/log", logMessage); + } + /** * Ack for the task poll * @param taskId Id of the task to be polled diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java index 19cf511874..d9e401a4fc 100644 --- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java +++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java @@ -15,13 +15,11 @@ */ package com.netflix.conductor.client.task; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -40,7 +38,6 @@ import com.netflix.conductor.client.worker.PropertyFactory; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.discovery.EurekaClient; import com.netflix.servo.monitor.Stopwatch; @@ -72,9 +69,8 @@ public class WorkflowTaskCoordinator { private int threadCount; - private static Map> environmentData = new HashMap<>(); - private static final String DOMAIN = "domain"; + private static final String ALL_WORKERS = "all"; /** @@ -253,7 +249,6 @@ public Thread newThread(Runnable r) { }); this.ses = Executors.newScheduledThreadPool(workers.size()); workers.forEach(worker -> { - environmentData.put(worker, getEnvData(worker)); ses.scheduleWithFixedDelay(()->pollForTask(worker), worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS); }); @@ -340,13 +335,16 @@ private void execute(Worker worker, Task task) { } catch (Exception e) { logger.error("Unable to execute task {}", task, e); + if (result == null) { + task.setStatus(Task.Status.FAILED); + result = new TaskResult(task); + } handleException(e, result, worker, false, task); } finally { sw.stop(); } logger.debug("Task {} executed by worker {} with status {}", task.getTaskId(), worker.getClass().getSimpleName(), task.getStatus()); - result.getLog().getEnvironment().putAll(environmentData.get(worker)); updateWithRetry(updateRetryCount, task, result, worker); } @@ -383,28 +381,6 @@ public int getUpdateRetryCount() { return updateRetryCount; } - static Map getEnvData(Worker worker) { - List props = worker.getLoggingEnvProps(); - Map data = new HashMap<>(); - if(props == null || props.isEmpty()) { - return data; - } - String workerName = worker.getTaskDefName(); - for(String property : props) { - property = property.trim(); - String defaultValue = System.getenv(property); - String value = PropertyFactory.getString(workerName, property, defaultValue); - data.put(property, value); - } - - try { - data.put("HOSTNAME", InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - - } - return data; - } - private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) { if(count < 0) { @@ -432,11 +408,12 @@ private void handleException(Throwable t, TaskResult result, Worker worker, bool WorkflowTaskMetrics.executionException(worker.getTaskDefName(), t); result.setStatus(TaskResult.Status.FAILED); result.setReasonForIncompletion("Error while executing the task: " + t); - TaskExecLog execLog = result.getLog(); - execLog.setError(t.getMessage()); - for (StackTraceElement ste : t.getStackTrace()) { - execLog.getErrorTrace().add(ste.toString()); - } + + StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + result.log(sw.toString()); + updateWithRetry(updateRetryCount, task, result, worker); } + } diff --git a/client/src/main/java/com/netflix/conductor/client/worker/Worker.java b/client/src/main/java/com/netflix/conductor/client/worker/Worker.java index fc04b3f00e..9be1f3b702 100644 --- a/client/src/main/java/com/netflix/conductor/client/worker/Worker.java +++ b/client/src/main/java/com/netflix/conductor/client/worker/Worker.java @@ -17,8 +17,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.List; import java.util.function.Function; import com.netflix.conductor.common.metadata.tasks.Task; @@ -30,8 +28,7 @@ * */ public interface Worker { - - + public String getTaskDefName(); /** @@ -102,15 +99,7 @@ public default int getPollCount() { public default int getPollingInterval() { return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000); } - - /** - * - * @return Returns a list of environment or system variables that should be logged - */ - public default List getLoggingEnvProps() { - String keys = PropertyFactory.getString(getTaskDefName(), "taskLogProps", "HOSTNAME,USER,EC2_INSTANCE_ID"); - return Arrays.asList(keys.split(",")); - } + /** * * @return Time to wait when making a poll to workflow server for tasks. The client will wait for at-least specified seconds for task queue to be "filled". diff --git a/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java b/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java index 82bcf6a0cc..ec9888c869 100644 --- a/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java +++ b/client/src/test/java/com/netflix/conductor/client/task/WorkflowTaskCoordinatorTests.java @@ -19,51 +19,31 @@ package com.netflix.conductor.client.task; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * @author Viren * */ public class WorkflowTaskCoordinatorTests { - - @Test - public void testLoggingEnvironment() { - Worker worker = Worker.create("test", (Task task)-> new TaskResult(task)); - List keys = worker.getLoggingEnvProps(); - - Map env = WorkflowTaskCoordinator.getEnvData(worker); - assertNotNull(env); - assertTrue(!env.isEmpty()); - Set loggedKeys = env.keySet(); - for(String key : keys) { - assertTrue(loggedKeys.contains(key)); - } - } @Test(expected=IllegalArgumentException.class) public void testNoWorkersException() { diff --git a/client/src/test/resources/log4j.properties b/client/src/test/resources/log4j.properties new file mode 100644 index 0000000000..5e31e3c26f --- /dev/null +++ b/client/src/test/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java index b15b29322d..91667d6639 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java @@ -506,7 +506,7 @@ public WorkflowTask getWorkflowTask() { /** * - * @param workflowTask + * @param workflowTask Task definition */ public void setWorkflowTask(WorkflowTask workflowTask) { this.workflowTask = workflowTask; diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskExecLog.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskExecLog.java index 7b7a9d4be7..833e4847cf 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskExecLog.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskExecLog.java @@ -18,50 +18,23 @@ */ package com.netflix.conductor.common.metadata.tasks; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; /** * @author Viren * Model that represents the task's execution log. */ public class TaskExecLog { - - private Map environment = new HashMap<>(); - - private List logs = Collections.synchronizedList(new ArrayList<>()); - - private List errorTrace = new LinkedList<>(); - private String error; + private List logs = new LinkedList<>(); private String taskId; - private String createdTime; + private long createdTime; + + public TaskExecLog() {} - public TaskExecLog() { - - } - - /** - * @return the environment - */ - public Map getEnvironment() { - return environment; - } - - /** - * @param environment the environment to set - * - */ - public void setEnvironment(Map environment) { - this.environment = environment; - } - /** * * @return Task Execution Logs @@ -72,49 +45,11 @@ public List getLogs() { /** * - * @param logs Log entries to set + * @param logs Task Execution Logs */ public void setLogs(List logs) { this.logs = logs; } - - /** - * - * @param log adds a log entry. The object is toString'ed and added to the logs - */ - public void log(Object log) { - this.logs.add(log.toString()); - } - - /** - * @return the errorTrace - */ - public List getErrorTrace() { - return errorTrace; - } - - /** - * @param errorTrace the errorTrace to set - * - */ - public void setErrorTrace(List errorTrace) { - this.errorTrace = errorTrace; - } - - /** - * @return the error - */ - public String getError() { - return error; - } - - /** - * @param error the error to set - * - */ - public void setError(String error) { - this.error = error; - } /** * @return the taskId @@ -132,21 +67,19 @@ public void setTaskId(String taskId) { } /** - * - * @return Creation time (server side) when the log was added + * @return the createdTime */ - public String getCreatedTime() { + public long getCreatedTime() { return createdTime; } - + /** + * @param createdTime the createdTime to set * - * @param createdTime creation time to set */ - public void setCreatedTime(String createdTime) { + public void setCreatedTime(long createdTime) { this.createdTime = createdTime; } - } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java index f275766923..c406c0f77d 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java @@ -19,7 +19,9 @@ package com.netflix.conductor.common.metadata.tasks; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; /** * @author Viren @@ -47,7 +49,7 @@ public enum Status { private Map outputData = new HashMap<>(); - private TaskExecLog log = new TaskExecLog(); + private List logs = new CopyOnWriteArrayList<>(); public TaskResult(Task task) { this.workflowInstanceId = task.getWorkflowInstanceId(); @@ -163,28 +165,32 @@ public TaskResult addOutputData(String key, Object value) { } /** - * @return the task execution log + * + * @return Task execution logs */ - public TaskExecLog getLog() { - return log; + public List getLogs() { + return logs; } - + /** - * @param log task execution log * + * @param logs Task execution logs */ - public void setLog(TaskExecLog log) { - this.log = log; + public void setLogs(List logs) { + this.logs = logs; } + /** * - * @param log adds a log entry. The object is toString'ed and added to the logs + * @param log Log line to be added + * @return Instance of TaskResult */ - public void log(Object log) { - getLog().log(log); + public TaskResult log(String log) { + this.logs.add(log); + return this; } - + @Override public String toString() { return "TaskResult [workflowInstanceId=" + workflowInstanceId + ", taskId=" + taskId + ", status=" + status + "]"; diff --git a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java index 62d6083778..f85ccde889 100644 --- a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java +++ b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java @@ -60,6 +60,10 @@ public class TaskSummary { private String taskType; + private String input; + + private String output; + public TaskSummary(Task task) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); @@ -76,7 +80,15 @@ public TaskSummary(Task task) { this.status = task.getStatus(); this.reasonForIncompletion = task.getReasonForIncompletion(); this.queueWaitTime = task.getQueueWaitTime(); - + if (task.getInputData() != null) { + this.input = task.getInputData().toString(); + } + + if (task.getOutputData() != null) { + this.output = task.getOutputData().toString(); + } + + if(task.getEndTime() > 0){ this.executionTime = task.getEndTime() - task.getStartTime(); } @@ -259,5 +271,36 @@ public void setTaskType(String taskType) { this.taskType = taskType; } + /** + * + * @return input to the task + */ + public String getInput() { + return input; + } + + /** + * + * @param input input to the task + */ + public void setInput(String input) { + this.input = input; + } + + /** + * + * @return output of the task + */ + public String getOutput() { + return output; + } + + /** + * + * @param output Task output + */ + public void setOutput(String output) { + this.output = output; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java index dfe8610113..cf5c3a748e 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java @@ -117,10 +117,15 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) throw try { HttpResponse response = httpCall(input); + logger.info("response {}, {}", response.statusCode, response.body); if(response.statusCode > 199 && response.statusCode < 300) { task.setStatus(Status.COMPLETED); } else { - task.setReasonForIncompletion(response.body.toString()); + if(response.body != null) { + task.setReasonForIncompletion(response.body.toString()); + } else { + task.setReasonForIncompletion("No response from the remote service"); + } task.setStatus(Status.FAILED); } if(response != null) { @@ -164,9 +169,9 @@ protected HttpResponse httpCall(Input input) throws Exception { return response; } catch(UniformInterfaceException ex) { - + logger.error(ex.getMessage(), ex); ClientResponse cr = ex.getResponse(); - + logger.error("Status Code: {}", cr.getStatus()); if(cr.getStatus() > 199 && cr.getStatus() < 300) { if(cr.getStatus() != 204 && cr.hasEntity()) { @@ -187,7 +192,7 @@ protected HttpResponse httpCall(Input input) throws Exception { private Object extractBody(ClientResponse cr) { String json = cr.getEntity(String.class); - logger.debug(json); + logger.info(json); try { diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java index d142c4a5c1..6ce68502f0 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java @@ -148,7 +148,7 @@ private void handle(ObservableQueue queue, Message msg) { for(EventHandler handler : handlers) { String condition = handler.getCondition(); - logger.info("condition: {}", condition); + logger.debug("condition: {}", condition); if(!StringUtils.isEmpty(condition)) { Boolean success = ScriptEvaluator.evalBool(condition, payloadObj); if(!success) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 5812aaa953..22b54206de 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -514,7 +514,7 @@ private List getTasksToBeScheduled(WorkflowDef def, Workflow workflow, Wor break; case WAIT: Map waitTaskInput = pu.getTaskInputV2(taskToSchedule.getInputParameters(), workflow, taskId, null); - Task waitTask = SystemTask.waitTask(workflow, workflow.getCorrelationId(), taskToSchedule, waitTaskInput); + Task waitTask = SystemTask.waitTask(workflow, taskId, taskToSchedule, waitTaskInput); tasks.add(waitTask); break; default: diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 3379de0004..f66eea4d3a 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -447,8 +447,10 @@ public void updateTask(TaskResult result) throws Exception { } edao.updateTask(task); - TaskExecLog tlog = result.getLog(); + TaskExecLog tlog = new TaskExecLog(); tlog.setTaskId(task.getTaskId()); + tlog.setLogs(result.getLogs()); + tlog.setCreatedTime(System.currentTimeMillis()); edao.addTaskExecLog(tlog); switch (task.getStatus()) { diff --git a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java index facf5142e0..1dcbc25bc8 100644 --- a/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/IndexDAO.java @@ -82,6 +82,13 @@ public interface IndexDAO { */ public void add(TaskExecLog log); + /** + * + * @param taskId Id of the task for which to fetch the execution logs + * @return Returns the task execution logs for given task id + */ + public List getTaskLogs(String taskId); + /** * diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index ccba43be5c..cadd92b425 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -33,6 +33,7 @@ import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.run.SearchResult; @@ -336,11 +337,40 @@ public boolean addEventExecution(EventExecution ee) { return edao.addEventExecution(ee); } + public void updateEventExecution(EventExecution ee) { edao.updateEventExecution(ee); } - public void addMessage(String name, Message msg) { - edao.addMessage(name, msg); + /** + * + * @param queue Name of the registered queue + * @param msg Message + */ + public void addMessage(String queue, Message msg) { + edao.addMessage(queue, msg); + } + + /** + * Adds task logs + * @param taskId Id of the task + * @param log logs + */ + public void log(String taskId, String log) { + TaskExecLog executionLog = new TaskExecLog(); + executionLog.setTaskId(taskId); + executionLog.getLogs().add(log); + executionLog.setCreatedTime(System.currentTimeMillis()); + edao.addTaskExecLog(executionLog); } + + /** + * + * @param taskId Id of the task for which to retrieve logs + * @return Execution Logs (logged by the worker) + */ + public List getTaskLogs(String taskId) { + return indexer.getTaskLogs(taskId); + } + } diff --git a/jersey/src/main/java/com/netflix/conductor/server/resources/TaskResource.java b/jersey/src/main/java/com/netflix/conductor/server/resources/TaskResource.java index 510b97e4cf..d9e33e4e21 100644 --- a/jersey/src/main/java/com/netflix/conductor/server/resources/TaskResource.java +++ b/jersey/src/main/java/com/netflix/conductor/server/resources/TaskResource.java @@ -38,6 +38,7 @@ import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.service.ExecutionService; @@ -126,6 +127,20 @@ public String updateTask(TaskResult task) throws Exception { public String ack(@PathParam("taskId") String taskId, @QueryParam("workerid") String workerId) throws Exception { return "" + taskService.ackTaskRecieved(taskId, workerId); } + + @POST + @Path("/{taskId}/log") + @ApiOperation("Log Task Execution Details") + public void log(@PathParam("taskId") String taskId, String log) throws Exception { + taskService.log(taskId, log); + } + + @GET + @Path("/{taskId}/log") + @ApiOperation("Get Task Execution Logs") + public List getTaskLogs(@PathParam("taskId") String taskId) throws Exception { + return taskService.getTaskLogs(taskId); + } @GET @Path("/{taskId}") diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java index 4691044154..c8c5b4aeac 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java @@ -185,8 +185,6 @@ public void updateTask(Task task) { dynoClient.srem(nsKey(IN_PROGRESS_TASKS, task.getTaskDefName()), task.getTaskId()); } - - indexer.index(task); } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java index 477aff822b..705de22297 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/index/ElasticSearchDAO.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedList; @@ -54,6 +55,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +114,7 @@ public class ElasticSearchDAO implements IndexDAO { private static final TimeZone gmt = TimeZone.getTimeZone("GMT"); - private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMww"); + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMWW"); static { sdf.setTimeZone(gmt); @@ -217,7 +219,7 @@ public void index(Workflow workflow) { @Override public void index(Task task) { try { - + String id = task.getTaskId(); TaskSummary summary = new TaskSummary(task); byte[] doc = om.writeValueAsBytes(summary); @@ -234,15 +236,15 @@ public void index(Task task) { @Override public void add(TaskExecLog taskExecLog) { - SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - sdf2.setTimeZone(gmt); - String created = sdf2.format(new Date()); + + if (taskExecLog.getLogs().isEmpty()) { + return; + } + int retry = 3; while(retry > 0) { try { - - taskExecLog.setCreatedTime(created); IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE); request.source(om.writeValueAsBytes(taskExecLog)); client.index(request).actionGet(); @@ -259,6 +261,38 @@ public void add(TaskExecLog taskExecLog) { } + + public List getTaskLogs(String taskId) { + + try { + + QueryBuilder qf = QueryBuilders.matchAllQuery(); + Expression expression = Expression.fromString("taskId='" + taskId + "'"); + qf = expression.getFilterBuilder(); + + BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(qf); + QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*"); + BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery); + + final SearchRequestBuilder srb = client.prepareSearch(logIndexPrefix + "*").setQuery(fq).setTypes(TASK_DOC_TYPE); + SearchResponse response = srb.execute().actionGet(); + SearchHit[] hits = response.getHits().getHits(); + List logs = new ArrayList<>(hits.length); + for(SearchHit hit : hits) { + String source = hit.getSourceAsString(); + TaskExecLog tel = om.readValue(source, TaskExecLog.class); + logs.add(tel); + } + + return logs; + + }catch(Exception e) { + log.error(e.getMessage(), e); + } + + return null; + } + @Override public void addMessage(String queue, Message msg) { @@ -421,5 +455,4 @@ private SearchResult search(String structuredQuery, int start, int size, long count = response.getHits().getTotalHits(); return new SearchResult(count, result); } - } diff --git a/redis-persistence/src/main/resources/template.json b/redis-persistence/src/main/resources/template.json index e35096061b..e4804fe8c4 100644 --- a/redis-persistence/src/main/resources/template.json +++ b/redis-persistence/src/main/resources/template.json @@ -1,41 +1,61 @@ { - "template": "*", - "order": 0, - "mappings": { - "_default_": { - "dynamic_templates": [ - { - "string_fields": { - "mapping": { - "index": "not_analyzed", - "type": "string", - "doc_values": true - }, - "match": "*", - "match_mapping_type": "string" - } + "template": "*", + "order": 0, + "mappings": { + "task": { + "properties": { + "log": { + "type": "string", + "index": "analyzed" + }, + "logs": { + "type": "string", + "index": "analyzed" + }, + "input": { + "type": "string", + "index": "analyzed" + }, + "output": { + "type": "string", + "index": "analyzed" + } + } }, - { - "long_fields": { - "mapping": { - "type": "long", - "doc_values": true - }, - "match": "*", - "match_mapping_type": "long" - } - }, - { - "double_fields": { - "mapping": { - "type": "double", - "doc_values": true - }, - "match": "*", - "match_mapping_type": "double" - } + "_default_": { + "dynamic_templates": [ + { + "string_fields": { + "mapping": { + "index": "not_analyzed", + "type": "string", + "doc_values": true + }, + "match": "*", + "match_mapping_type": "string" + } + }, + { + "long_fields": { + "mapping": { + "type": "long", + "doc_values": true + }, + "match": "*", + "match_mapping_type": "long" + } + }, + { + "double_fields": { + "mapping": { + "type": "double", + "doc_values": true + }, + "match": "*", + "match_mapping_type": "double" + } + } + ] } - ] } - } } \ No newline at end of file diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java index 010a14cdf4..ca4e9167a9 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MockIndexDAO.java @@ -71,9 +71,14 @@ public void addMessage(String queue, Message msg) { } - @Override + @Override public String get(String workflowInstanceId, String key) { return null; } + + @Override + public List getTaskLogs(String taskId) { + return null; + } }