Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #225 from Netflix/dev
Browse files Browse the repository at this point in the history
Task Logger Implementation for the worker
  • Loading branch information
v1r3n authored Jun 16, 2017
2 parents 056f8fa + bf4b581 commit 7f04fa9
Show file tree
Hide file tree
Showing 23 changed files with 284 additions and 225 deletions.
2 changes: 2 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {
compile 'com.netflix.eureka:eureka-client:latest.release'
compile 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.7.5'
compile 'com.netflix.archaius:archaius-core:0.7.5'

testCompile 'org.slf4j:slf4j-log4j12:1.8.0-alpha1'
}
3 changes: 2 additions & 1 deletion client/python/conductor/ConductorWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
import sys
import time
import subprocess
Expand All @@ -37,6 +37,7 @@ def execute(self, task, exec_function):
raise Exception('Task execution function MUST return a response as a dict with status and output fields')
task['status'] = resp['status']
task['outputData'] = resp['output']
task['logs'] = resp['logs']
self.taskClient.updateTask(task)
except Exception as err:
print 'Error executing task: ' + str(err)
Expand Down
6 changes: 3 additions & 3 deletions client/python/kitchensink_workers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from conductor.ConductorWorker import ConductorWorker

def execute(task):
return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}}
return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0}, 'logs': ['one','two']}

def execute4(task):
forkTasks = [{"name": "task_1", "taskReferenceName": "task_1_1", "type": "SIMPLE"},{"name": "sub_workflow_4", "taskReferenceName": "wf_dyn", "type": "SUB_WORKFLOW", "subWorkflowParam": {"name": "sub_flow_1"}}];
input = {'task_1_1': {}, 'wf_dyn': {}}
return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0, 'dynamicTasks': forkTasks, 'inputs': input}}
return {'status': 'COMPLETED', 'output': {'mod': 5, 'taskToExecute': 'task_1', 'oddEven': 0, 'dynamicTasks': forkTasks, 'inputs': input}, 'logs': ['one','two']}

def main():
print 'Hello World'
print 'Starting Kitchensink workflows'
cc = ConductorWorker('http://localhost:8080/api', 1, 0.1)
for x in range(1, 30):
if(x == 4):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void updateTask(TaskResult task) {
postForEntity("tasks", task);
}

public void log(String taskId, String logMessage) {
postForEntity("tasks/" + taskId + "/log", logMessage);
}

/**
* Ack for the task poll
* @param taskId Id of the task to be polled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
*/
package com.netflix.conductor.client.task;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -40,7 +38,6 @@
import com.netflix.conductor.client.worker.PropertyFactory;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;
import com.netflix.servo.monitor.Stopwatch;
Expand Down Expand Up @@ -72,9 +69,8 @@ public class WorkflowTaskCoordinator {

private int threadCount;

private static Map<Worker, Map<String, Object>> environmentData = new HashMap<>();

private static final String DOMAIN = "domain";

private static final String ALL_WORKERS = "all";

/**
Expand Down Expand Up @@ -253,7 +249,6 @@ public Thread newThread(Runnable r) {
});
this.ses = Executors.newScheduledThreadPool(workers.size());
workers.forEach(worker -> {
environmentData.put(worker, getEnvData(worker));
ses.scheduleWithFixedDelay(()->pollForTask(worker), worker.getPollingInterval(), worker.getPollingInterval(), TimeUnit.MILLISECONDS);
});

Expand Down Expand Up @@ -340,13 +335,16 @@ private void execute(Worker worker, Task task) {

} catch (Exception e) {
logger.error("Unable to execute task {}", task, e);
if (result == null) {
task.setStatus(Task.Status.FAILED);
result = new TaskResult(task);
}
handleException(e, result, worker, false, task);
} finally {
sw.stop();
}

logger.debug("Task {} executed by worker {} with status {}", task.getTaskId(), worker.getClass().getSimpleName(), task.getStatus());
result.getLog().getEnvironment().putAll(environmentData.get(worker));
updateWithRetry(updateRetryCount, task, result, worker);

}
Expand Down Expand Up @@ -383,28 +381,6 @@ public int getUpdateRetryCount() {
return updateRetryCount;
}

static Map<String, Object> getEnvData(Worker worker) {
List<String> props = worker.getLoggingEnvProps();
Map<String, Object> data = new HashMap<>();
if(props == null || props.isEmpty()) {
return data;
}
String workerName = worker.getTaskDefName();
for(String property : props) {
property = property.trim();
String defaultValue = System.getenv(property);
String value = PropertyFactory.getString(workerName, property, defaultValue);
data.put(property, value);
}

try {
data.put("HOSTNAME", InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {

}
return data;
}

private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) {

if(count < 0) {
Expand Down Expand Up @@ -432,11 +408,12 @@ private void handleException(Throwable t, TaskResult result, Worker worker, bool
WorkflowTaskMetrics.executionException(worker.getTaskDefName(), t);
result.setStatus(TaskResult.Status.FAILED);
result.setReasonForIncompletion("Error while executing the task: " + t);
TaskExecLog execLog = result.getLog();
execLog.setError(t.getMessage());
for (StackTraceElement ste : t.getStackTrace()) {
execLog.getErrorTrace().add(ste.toString());
}

StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
result.log(sw.toString());

updateWithRetry(updateRetryCount, task, result, worker);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -30,8 +28,7 @@
*
*/
public interface Worker {



public String getTaskDefName();

/**
Expand Down Expand Up @@ -102,15 +99,7 @@ public default int getPollCount() {
public default int getPollingInterval() {
return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000);
}

/**
*
* @return Returns a list of environment or system variables that should be logged
*/
public default List<String> getLoggingEnvProps() {
String keys = PropertyFactory.getString(getTaskDefName(), "taskLogProps", "HOSTNAME,USER,EC2_INSTANCE_ID");
return Arrays.asList(keys.split(","));
}

/**
*
* @return Time to wait when making a poll to workflow server for tasks. The client will wait for at-least specified seconds for task queue to be "filled".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,31 @@
package com.netflix.conductor.client.task;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/**
* @author Viren
*
*/
public class WorkflowTaskCoordinatorTests {

@Test
public void testLoggingEnvironment() {
Worker worker = Worker.create("test", (Task task)-> new TaskResult(task));
List<String> keys = worker.getLoggingEnvProps();

Map<String, Object> env = WorkflowTaskCoordinator.getEnvData(worker);
assertNotNull(env);
assertTrue(!env.isEmpty());
Set<String> loggedKeys = env.keySet();
for(String key : keys) {
assertTrue(loggedKeys.contains(key));
}
}

@Test(expected=IllegalArgumentException.class)
public void testNoWorkersException() {
Expand Down
9 changes: 9 additions & 0 deletions client/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public WorkflowTask getWorkflowTask() {

/**
*
* @param workflowTask
* @param workflowTask Task definition
*/
public void setWorkflowTask(WorkflowTask workflowTask) {
this.workflowTask = workflowTask;
Expand Down
Loading

0 comments on commit 7f04fa9

Please sign in to comment.