Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #34 from Netflix/dev
Browse files Browse the repository at this point in the history
Merge 1.6.0-rc1 to master
  • Loading branch information
v1r3n authored Jan 17, 2017
2 parents 11578aa + cd57159 commit 4fb004c
Show file tree
Hide file tree
Showing 142 changed files with 2,560 additions and 4,795 deletions.
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
docs/* linguist-documentation
server/src/main/resources/swagger-ui/* linguist-vendored


7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
![Conductor](https://netflix.github.io/conductor/img/corner-logo2.png)
![Conductor](docs/docs/img/conductor-vector-x.png)

## Conductor
Conductor is an _orchestration_ engine that runs in the cloud.
Expand All @@ -12,7 +12,7 @@ Conductor is an _orchestration_ engine that runs in the cloud.

[Getting Started](http://netflix.github.io/conductor/intro) guide.

## Get Condcutor
## Get Conductor
Binaries are available from Maven Central and jcenter.

|Group|Artifact|Latest Stable Version|
Expand All @@ -30,6 +30,7 @@ Below are the various artifacts published:
|conductor-ui|node.js based UI for Conductor|
|conductor-contribs|Optional contrib package that holds extended workflow tasks and support for SQS|
|conductor-client|Java client for Conductor that includes helpers for running a worker tasks|
|conductor-server|Self contained Jetty server|
|conductor-test-harness|Used for building test harness and an in-memory kitchensink demo|

## Building
Expand Down Expand Up @@ -58,7 +59,7 @@ com.netflix.conductor.dao.RedisESWorkflowModule

* The default persistence used is [Dynomite](https://github.com/Netflix/dynomite)
* For queues, we are relying on [dyno-queues](https://github.com/Netflix/dyno-queues)
* The indexing backend is [Elastic](https://www.elastic.co/) (2.+)
* The indexing backend is [Elasticsearch](https://www.elastic.co/) (2.+)

## Other Requirements
* JDK 1.8+
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ subprojects {

repositories {
jcenter()
maven { url 'https://dl.bintray.com/netflixoss/oss-candidate/' }
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,10 @@ protected <T> T getForEntity(String url, Object[] queryParams, GenericType<T> re
}

private Builder resource(URI URI, Object entity) {
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN);
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
}

private void handleException(Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void skipTaskFromWorkflow(String workflowId, String taskReferenceName) {
put("workflow/{workflowId}/skiptask/{taskReferenceName}", null, workflowId, taskReferenceName);
}

public void runDecider(String workflowName) {
put("workflow/decide/{workflowName}", null, workflowName);
public void runDecider(String workflowId) {
put("workflow/decide/{workflowId}", null, null, workflowId);
}

public SearchResult<WorkflowSummary> search(String query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public TaskResult() {

}

/**
*
* @return Workflow instance id for which the task result is produced
*/
public String getWorkflowInstanceId() {
return workflowInstanceId;
}
Expand Down Expand Up @@ -106,6 +110,10 @@ public void setWorkerId(String workerId) {
public Status getTaskStatus() {
return status;
}

public void setTaskStatus(Status status) {
this.status = status;
}

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class WorkflowDef extends Auditable {

private List<String> inputParameters = new LinkedList<String>();

private Map<String, String> outputParameters = new HashMap<>();
private Map<String, Object> outputParameters = new HashMap<>();

private String failureWorkflow;

Expand Down Expand Up @@ -109,14 +109,14 @@ public void setInputParameters(List<String> inputParameters) {
/**
* @return the outputParameters
*/
public Map<String, String> getOutputParameters() {
public Map<String, Object> getOutputParameters() {
return outputParameters;
}

/**
* @param outputParameters the outputParameters to set
*/
public void setOutputParameters(Map<String, String> outputParameters) {
public void setOutputParameters(Map<String, Object> outputParameters) {
this.outputParameters = outputParameters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static boolean is(String name) {

//Key: Name of the input parameter. MUST be one of the keys defined in TaskDef (e.g. fileName)
//Value: mapping of the parameter from another task (e.g. task1.someOutputParameterAsFileName)
private Map<String, String> inputParameters = new HashMap<String, String>();
private Map<String, Object> inputParameters = new HashMap<String, Object>();

private String type = Type.SIMPLE.name();

Expand Down Expand Up @@ -88,11 +88,6 @@ public static boolean is(String name) {

private List<String> joinOn = new LinkedList<>();

// Used for Titus, if Titus job calls back to update the task the value should be true
// else false. If false Workflow processing will only check if the titus job is finished and
// then move on
private boolean callbackFromWorker = true;

/**
* @return the name
*/
Expand Down Expand Up @@ -124,14 +119,14 @@ public void setTaskReferenceName(String taskReferenceName) {
/**
* @return the inputParameters
*/
public Map<String, String> getInputParameters() {
public Map<String, Object> getInputParameters() {
return inputParameters;
}

/**
* @param inputParameters the inputParameters to set
*/
public void setInputParameters(Map<String, String> inputParameters) {
public void setInputParameters(Map<String, Object> inputParameters) {
this.inputParameters = inputParameters;
}

Expand Down Expand Up @@ -297,14 +292,6 @@ public void setJoinOn(List<String> joinOn) {
this.joinOn = joinOn;
}

public boolean isCallbackFromWorker() {
return callbackFromWorker;
}

public void setCallbackFromWorker(boolean callbackFromWorker) {
this.callbackFromWorker = callbackFromWorker;
}

private Collection<List<WorkflowTask>> children(){
Collection<List<WorkflowTask>> v1 = new LinkedList<>();
Type tt = Type.USER_DEFINED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
package com.netflix.conductor.contribs.http;


import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -87,7 +88,6 @@ public HttpTask(String name, RestClientManager rcm, Configuration config) {
logger.info("HttpTask initialized...");
}


@Override
public void start(Workflow workflow, Task task, WorkflowExecutor executor) throws Exception {
Object request = task.getInputData().get(requestParameter);
Expand Down Expand Up @@ -116,8 +116,13 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) throw

try {

Object response = httpCall(input);
task.setStatus(Status.COMPLETED);
HttpResponse response = httpCall(input);
if(response.statusCode > 199 && response.statusCode < 300) {
task.setStatus(Status.COMPLETED);
} else {
task.setReasonForIncompletion(response.body.toString());
task.setStatus(Status.FAILED);
}
if(response != null) {
task.getOutputData().put("response", response);
}
Expand All @@ -127,16 +132,15 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) throw
task.setReasonForIncompletion(e.getMessage());
task.getOutputData().put("response", e.getMessage());
}

}

/**
*
* @param input HTTP Request
* @return Response if any, null otherwise
* @return Response of the http call
* @throws Exception If there was an error making http call
*/
protected Object httpCall(Input input) throws Exception {
protected HttpResponse httpCall(Input input) throws Exception {
Client client = rcm.getClient(input);
Builder builder = client.resource(input.uri).type(MediaType.APPLICATION_JSON);
if(input.body != null) {
Expand All @@ -146,35 +150,30 @@ protected Object httpCall(Input input) throws Exception {
builder.header(e.getKey(), e.getValue());
});

Object response = null;
HttpResponse response = new HttpResponse();
try {

String json = builder.accept(input.accept).method(input.method, String.class);
logger.debug(json);
try {
JsonNode node = om.readTree(json);
if(node.isArray()) {
response = om.convertValue(node, listOfObj);
} else if (node.isObject()) {
response = om.convertValue(node, mapOfObj);
} else if (node.isNumber()) {
response = om.convertValue(node, Double.class);
} else {
response = node.asText();
}

}catch(JsonParseException jpe) {
logger.error(jpe.getMessage(), jpe);
response = json;

ClientResponse cr = builder.accept(input.accept).method(input.method, ClientResponse.class);
if (cr.hasEntity()) {
response.body = extractBody(cr);
}
response.statusCode = cr.getStatus();
response.headers = cr.getHeaders();
return response;

} catch(UniformInterfaceException ex) {

ClientResponse cr = ex.getResponse();

if(cr.getStatus() > 199 && cr.getStatus() < 300) {
return cr.getEntity(String.class);

if(cr.hasEntity()) {
response.body = extractBody(cr);
}
response.headers = cr.getHeaders();
response.statusCode = cr.getStatus();
return response;

}else {
String reason = cr.getEntity(String.class);
logger.error(reason, ex);
Expand All @@ -183,6 +182,30 @@ protected Object httpCall(Input input) throws Exception {
}
}

private Object extractBody(ClientResponse cr) {

String json = cr.getEntity(String.class);
logger.debug(json);

try {

JsonNode node = om.readTree(json);
if (node.isArray()) {
return om.convertValue(node, listOfObj);
} else if (node.isObject()) {
return om.convertValue(node, mapOfObj);
} else if (node.isNumber()) {
return om.convertValue(node, Double.class);
} else {
return node.asText();
}

} catch (IOException jpe) {
logger.error(jpe.getMessage(), jpe);
return json;
}
}

@Override
public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) throws Exception {
if (task.getStatus().equals(Status.SCHEDULED)) {
Expand Down Expand Up @@ -212,6 +235,20 @@ private static ObjectMapper objectMapper() {
return om;
}

public static class HttpResponse {

public Object body;

public MultivaluedMap<String, String> headers;

public int statusCode;

@Override
public String toString() {
return "HttpResponse [body=" + body + ", headers=" + headers + ", statusCode=" + statusCode + "]";
}
}

public static class Input {

private String method; //PUT, POST, GET, DELETE, OPTIONS, HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private void addPolicy(List<String> accountsToAuthorize) {
}

private String getPolicy(List<String> accountIds) {
Policy policy = new Policy("ReloadedWorkerAccessPolicy");
Policy policy = new Policy("AuthorizedWorkerAccessPolicy");
Statement stmt = new Statement(Effect.Allow);
Action action = SQSActions.SendMessage;
stmt.getActions().add(action);
Expand Down
Loading

0 comments on commit 4fb004c

Please sign in to comment.