Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #231 from Netflix/dev
Browse files Browse the repository at this point in the history
Task Logger Changes
  • Loading branch information
v1r3n authored Jun 20, 2017
2 parents 7f04fa9 + a492fed commit 6b4be0a
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,39 @@
*/
package com.netflix.conductor.common.metadata.tasks;

import java.util.LinkedList;
import java.util.List;

/**
* @author Viren
* Model that represents the task's execution log.
*/
public class TaskExecLog {

private List<String> logs = new LinkedList<>();
private String log;

private String taskId;

private long createdTime;

public TaskExecLog() {}

public TaskExecLog(String log) {
this.log =log;
this.createdTime = System.currentTimeMillis();
}

/**
*
* @return Task Execution Logs
* @return Task Exec Log
*/
public List<String> getLogs() {
return logs;
public String getLog() {
return log;
}

/**
*
* @param logs Task Execution Logs
* @param log The Log
*/
public void setLogs(List<String> logs) {
this.logs = logs;
public void setLog(String log) {
this.log = log;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public enum Status {

private Map<String, Object> outputData = new HashMap<>();

private List<String> logs = new CopyOnWriteArrayList<>();
private List<TaskExecLog> logs = new CopyOnWriteArrayList<>();

public TaskResult(Task task) {
this.workflowInstanceId = task.getWorkflowInstanceId();
Expand Down Expand Up @@ -168,15 +168,15 @@ public TaskResult addOutputData(String key, Object value) {
*
* @return Task execution logs
*/
public List<String> getLogs() {
public List<TaskExecLog> getLogs() {
return logs;
}

/**
*
* @param logs Task execution logs
*/
public void setLogs(List<String> logs) {
public void setLogs(List<TaskExecLog> logs) {
this.logs = logs;
}

Expand All @@ -187,7 +187,7 @@ public void setLogs(List<String> logs) {
* @return Instance of TaskResult
*/
public TaskResult log(String log) {
this.logs.add(log);
this.logs.add(new TaskExecLog(log));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest;
Expand Down Expand Up @@ -447,11 +446,8 @@ public void updateTask(TaskResult result) throws Exception {
}
edao.updateTask(task);

TaskExecLog tlog = new TaskExecLog();
tlog.setTaskId(task.getTaskId());
tlog.setLogs(result.getLogs());
tlog.setCreatedTime(System.currentTimeMillis());
edao.addTaskExecLog(tlog);
result.getLogs().forEach(tl -> tl.setTaskId(task.getTaskId()));
edao.addTaskExecLog(result.getLogs());

switch (task.getStatus()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public interface ExecutionDAO {
* @param log Task Execution Log to be added
*
*/
public abstract void addTaskExecLog(TaskExecLog log);
public abstract void addTaskExecLog(List<TaskExecLog> log);

/**
*
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/conductor/dao/IndexDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public interface IndexDAO {

/**
*
* @param log Task Execution log to be indexed
* @param logs Task Execution logs to be indexed
*/
public void add(TaskExecLog log);
public void add(List<TaskExecLog> logs);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.conductor.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -359,9 +360,9 @@ public void addMessage(String queue, Message msg) {
public void log(String taskId, String log) {
TaskExecLog executionLog = new TaskExecLog();
executionLog.setTaskId(taskId);
executionLog.getLogs().add(log);
executionLog.setLog(log);
executionLog.setCreatedTime(System.currentTimeMillis());
edao.addTaskExecLog(executionLog);
edao.addTaskExecLog(Arrays.asList(executionLog));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public boolean exceedsInProgressLimit(Task task) {
}

@Override
public void addTaskExecLog(TaskExecLog log) {
public void addTaskExecLog(List<TaskExecLog> log) {
indexer.add(log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
Expand All @@ -56,6 +57,7 @@
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -235,19 +237,23 @@ public void index(Task task) {
}

@Override
public void add(TaskExecLog taskExecLog) {
public void add(List<TaskExecLog> taskExecLogs) {

if (taskExecLog.getLogs().isEmpty()) {
if (taskExecLogs.isEmpty()) {
return;
}

int retry = 3;
while(retry > 0) {
try {

IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE);
request.source(om.writeValueAsBytes(taskExecLog));
client.index(request).actionGet();
BulkRequestBuilder brb = client.prepareBulk();
for(TaskExecLog taskExecLog : taskExecLogs) {
IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE);
request.source(om.writeValueAsBytes(taskExecLog));
brb.add(request);
}
brb.execute().actionGet();
break;

} catch (Throwable e) {
Expand All @@ -274,7 +280,7 @@ public List<TaskExecLog> getTaskLogs(String taskId) {
QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery("*");
BoolQueryBuilder fq = QueryBuilders.boolQuery().must(stringQuery).must(filterQuery);

final SearchRequestBuilder srb = client.prepareSearch(logIndexPrefix + "*").setQuery(fq).setTypes(TASK_DOC_TYPE);
final SearchRequestBuilder srb = client.prepareSearch(logIndexPrefix + "*").setQuery(fq).setTypes(TASK_DOC_TYPE).addSort(SortBuilders.fieldSort("createdTime").order(SortOrder.ASC));
SearchResponse response = srb.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
List<TaskExecLog> logs = new ArrayList<>(hits.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void index(Task task) {
}

@Override
public void add(TaskExecLog log) {
public void add(List<TaskExecLog> logs) {

}

@Override
Expand Down
22 changes: 22 additions & 0 deletions ui/src/actions/WorkflowActions.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,25 @@ export function getEvents(event, time, query) {
});
}
}

export function getTaskLogs(taskId) {

return function (dispatch) {
dispatch({
type: 'GET_TASK_LOGS'
});


return http.get('/api/wfe/task/log' + taskId).then((data) => {
dispatch({
type: 'RECEIVED_GET_TASK_LOGS',
logs : data
});
}).catch((e) => {
dispatch({
type: 'REQUEST_ERROR',
e
});
});
}
}
22 changes: 21 additions & 1 deletion ui/src/api/wfe.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { join } from 'path';
import { Router } from 'express';
import http from '../core/HttpClient';
import moment from 'moment';

const router = new Router();
const baseURL = process.env.WF_SERVER;
Expand Down Expand Up @@ -48,7 +49,7 @@ router.get('/id/:workflowId', async (req, res, next) => {
const meta = await http.get(baseURLMeta + 'workflow/' + result.workflowType + '?version=' + result.version);
const subs = [];
const subworkflows = {};
result.tasks.forEach(task=>{
result.tasks.forEach(task => {
if(task.taskType == 'SUB_WORKFLOW'){
let subWorkflowId = task.outputData && task.outputData.subWorkflowId;
if(subWorkflowId == null) {
Expand All @@ -59,6 +60,17 @@ router.get('/id/:workflowId', async (req, res, next) => {
}
}
});
for(let t = 0; t < result.tasks.length; t++) {
let task = result.tasks[t];
let logs = await http.get(baseURLTask + task.taskId + '/log');
logs = logs || [];
let logs2 = [];
logs.forEach(log => {
const dtstr = moment(log.createdTime).format('MM/DD/YY, HH:mm:ss:SSS');
logs2.push(dtstr + ' : ' + log.log);
});
task.logs = logs2;
}
let submeta = {};
for(let i = 0; i < subs.length; i++){
let submeta = await http.get(baseURLMeta + 'workflow/' + subs[i].name + '?version=' + subs[i].version);
Expand Down Expand Up @@ -143,6 +155,14 @@ router.get('/metadata/taskdef', async (req, res, next) => {
next(err);
}
});
router.get('/task/log/:taskId', async (req, res, next) => {
try {
const logs = await http.get(baseURLTask + req.params.taskId + '/log');
res.status(200).send({logs});
} catch (err) {
next(err);
}
});
router.get('/queue/data', async (req, res, next) => {
try {
const sizes = await http.get(baseURLTask + 'queue/all');
Expand Down
4 changes: 4 additions & 0 deletions ui/src/api/wfegraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class Workflow2Graph {
style = 'stroke: #48a770; fill: #48a770';
labelStyle = 'fill:#ffffff; stroke-width: 1px';
break;
case 'COMPLETED_WITH_ERRORS':
style = 'stroke: #FF8C00; fill: #FF8C00';
labelStyle = 'fill:#ffffff; stroke-width: 1px';
break;
case 'SKIPPED':
style = 'stroke: #cccccc; fill: #ccc';
labelStyle = 'fill:#ffffff; stroke-width: 1px';
Expand Down
5 changes: 5 additions & 0 deletions ui/src/components/common/Grapher.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Grapher extends Component {
super(props);
this.state = {}
this.state.selectedTask = {};
this.state.logs = {};
this.state.edges = props.edges || [];
this.state.vertices = props.vertices || {};
this.state.layout = 'TD';
Expand Down Expand Up @@ -204,6 +205,10 @@ class Grapher extends Component {
<i title="copy to clipboard" className="btn fa fa-clipboard" data-clipboard-target="#t_json"></i>
<pre id="t_json">{JSON.stringify(this.state.selectedTask, null, 3)}</pre>
</Tab>
<Tab eventKey={3} title="Logs"><br/>
<i title="copy to clipboard" className="btn fa fa-clipboard" data-clipboard-target="#t_logs"></i>
<pre id="t_logs">{JSON.stringify(this.state.selectedTask.logs, null, 3)}</pre>
</Tab>
</Tabs>
</div>
<svg>
Expand Down
2 changes: 0 additions & 2 deletions ui/src/components/workflow/executions/WorkflowDia.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class WorkflowDia extends Component {

render() {



var wf = this.props.data;
if(wf == null) {
wf = {};
Expand Down
14 changes: 14 additions & 0 deletions ui/src/reducers/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ export default function workflows(state = initialState, action) {
pausing: false,
resumign: false
};
case 'GET_TASK_LOGS':
return {
...state,
fetching: true,
error: false
};
case 'RECEIVED_GET_TASK_LOGS':
return {
...state,
logs: action.logs,
error: false,
fetching: false,
refetch: false
};
default:
return state;
};
Expand Down

0 comments on commit 6b4be0a

Please sign in to comment.