Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #734 from Netflix/externalize_large_payloads
Browse files Browse the repository at this point in the history
store large payloads in external storage - part 1
  • Loading branch information
apanicker-nflx authored Sep 10, 2018
2 parents 854bcbc + b1db3b5 commit 55c1c4c
Show file tree
Hide file tree
Showing 87 changed files with 4,475 additions and 1,195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.client.config;

public interface ConductorClientConfiguration {

/**
* @return the workflow input payload size threshold in KB,
* beyond which the payload will be processed based on {@link ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
*/
int getWorkflowInputPayloadThresholdKB();

/**
* @return the max value of workflow input payload size threshold in KB,
* beyond which the payload will be rejected regardless external payload storage is enabled.
*/
int getWorkflowInputMaxPayloadThresholdKB();

/**
* @return the task output payload size threshold in KB,
* beyond which the payload will be processed based on {@link ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
*/
int getTaskOutputPayloadThresholdKB();

/**
* @return the max value of task output payload size threshold in KB,
* beyond which the payload will be rejected regardless external payload storage is enabled.
*/
int getTaskOutputMaxPayloadThresholdKB();

/**
* @return the flag which controls the use of external storage for storing workflow/task
* input and output JSON payloads with size greater than threshold.
* If it is set to true, the payload is stored in external location.
* If it is set to false, the payload is rejected and the task/workflow execution fails.
*/
boolean isExternalPayloadStorageEnabled();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.client.config;

/**
* A default implementation of {@link ConductorClientConfiguration}
* where external payload storage is disabled.
*/
public class DefaultConductorClientConfiguration implements ConductorClientConfiguration {

@Override
public int getWorkflowInputPayloadThresholdKB() {
return 5120;
}

@Override
public int getWorkflowInputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public int getTaskOutputPayloadThresholdKB() {
return 3072;
}

@Override
public int getTaskOutputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public boolean isExternalPayloadStorageEnabled() {
return false;
}
}
116 changes: 77 additions & 39 deletions client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,17 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/

package com.netflix.conductor.client.http;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.common.base.Preconditions;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.client.exceptions.ConductorClientException;
import com.netflix.conductor.client.exceptions.ErrorResponse;
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.ClientHandlerException;
Expand All @@ -33,14 +36,18 @@
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

/**
Expand All @@ -56,15 +63,23 @@ public abstract class ClientBase {

protected ObjectMapper objectMapper;

protected PayloadStorage payloadStorage;

protected ConductorClientConfiguration conductorClientConfiguration;

protected ClientBase() {
this(new DefaultClientConfig(), null);
this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}

protected ClientBase(ClientConfig config) {
this(config, new DefaultConductorClientConfiguration(), null);
}

protected ClientBase(ClientConfig clientConfig) {
this(clientConfig, null);
protected ClientBase(ClientConfig config, ClientHandler handler) {
this(config, new DefaultConductorClientConfiguration(), handler);
}

protected ClientBase(ClientConfig clientConfig, ClientHandler handler) {
protected ClientBase(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler) {
objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
Expand All @@ -73,13 +88,16 @@ protected ClientBase(ClientConfig clientConfig, ClientHandler handler) {
objectMapper.setSerializationInclusion(Include.NON_EMPTY);

JacksonJsonProvider provider = new JacksonJsonProvider(objectMapper);
clientConfig.getSingletons().add(provider);
config.getSingletons().add(provider);

if (handler == null) {
this.client = Client.create(clientConfig);
this.client = Client.create(config);
} else {
this.client = new Client(handler, clientConfig);
this.client = new Client(handler, config);
}

conductorClientConfiguration = clientConfiguration;
payloadStorage = new PayloadStorage(this);
}

public void setRootURI(String root) {
Expand All @@ -92,7 +110,6 @@ protected void delete(String url, Object... uriVariables) {

protected void delete(Object[] queryParams, String url, Object... uriVariables) {
URI uri = null;
ClientResponse clientResponse = null;
try {
uri = getURIBuilder(root + url, queryParams).build(uriVariables);
client.resource(uri).delete();
Expand All @@ -111,28 +128,11 @@ protected void put(String url, Object[] queryParams, Object request, Object... u
}
}

/**
* @deprecated replaced by {@link #postForEntityWithRequestOnly(String, Object)} ()}
*/
@Deprecated
protected void postForEntity(String url, Object request) {
postForEntityWithRequestOnly(url, request);
}


protected void postForEntityWithRequestOnly(String url, Object request) {
Class<?> type = null;
postForEntity(url, request, null, type);
}

/**
* @deprecated replaced by {@link #postForEntityWithUriVariablesOnly(String, Object...)} ()}
*/
@Deprecated
protected void postForEntity1(String url, Object... uriVariables) {
postForEntityWithUriVariablesOnly(url, uriVariables);
}

protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) {
Class<?> type = null;
postForEntity(url, null, null, type, uriVariables);
Expand Down Expand Up @@ -165,26 +165,27 @@ private <T> T postForEntity(String url, Object request, Object[] queryParams, Ob
return null;
}

protected <T> T getForEntity(String url, Object[] queryParams, Class<T> responseType, Object... uriVariables) {
<T> T getForEntity(String url, Object[] queryParams, Class<T> responseType, Object... uriVariables) {
return getForEntity(url, queryParams, response -> response.getEntity(responseType), uriVariables);
}

protected <T> T getForEntity(String url, Object[] queryParams, GenericType<T> responseType, Object... uriVariables) {
<T> T getForEntity(String url, Object[] queryParams, GenericType<T> responseType, Object... uriVariables) {
return getForEntity(url, queryParams, response -> response.getEntity(responseType), uriVariables);
}

private <T> T getForEntity(String url, Object[] queryParams, Function<ClientResponse, T> entityPvoider, Object... uriVariables) {
private <T> T getForEntity(String url, Object[] queryParams, Function<ClientResponse, T> entityProvider, Object... uriVariables) {
URI uri = null;
ClientResponse clientResponse = null;
ClientResponse clientResponse;
try {
uri = getURIBuilder(root + url, queryParams).build(uriVariables);
clientResponse = client.resource(uri)
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
if (clientResponse.getStatus() < 300) {
return entityPvoider.apply(clientResponse);
return entityProvider.apply(clientResponse);

} else {
throw new UniformInterfaceException(clientResponse); // let handleUniformInterfaceException to handle unexpected response consistently
throw new UniformInterfaceException(clientResponse);
}
} catch (UniformInterfaceException e) {
handleUniformInterfaceException(e, uri);
Expand All @@ -194,11 +195,49 @@ private <T> T getForEntity(String url, Object[] queryParams, Function<ClientResp
return null;
}

/**
* Uses the {@link PayloadStorage} for storing large payloads.
* Gets the uri for storing the payload from the server and then uploads to this location.
*
* @param payloadType the {@link com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be uploaded
* @param payloadBytes the byte array containing the payload
* @param payloadSize the size of the payload
* @return the path where the payload is stored in external storage
*/
protected String uploadToExternalPayloadStorage(ExternalPayloadStorage.PayloadType payloadType, byte[] payloadBytes, long payloadSize) {
Preconditions.checkArgument(payloadType.equals(ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT) || payloadType.equals(ExternalPayloadStorage.PayloadType.TASK_OUTPUT),
"Payload type must be workflow input or task output");
ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.WRITE, payloadType, "");
payloadStorage.upload(externalStorageLocation.getUri(), new ByteArrayInputStream(payloadBytes), payloadSize);
return externalStorageLocation.getPath();
}

/**
* Uses the {@link PayloadStorage} for downloading large payloads to be used by the client.
* Gets the uri of the payload fom the server and then downloads from this location.
*
* @param payloadType the {@link com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be downloaded
* @param path the relative of the payload in external storage
* @return the payload object that is stored in external storage
*/
@SuppressWarnings("unchecked")
protected Map<String, Object> downloadFromExternalStorage(ExternalPayloadStorage.PayloadType payloadType, String path) {
Preconditions.checkArgument(StringUtils.isNotBlank(path), "uri cannot be blank");
ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.READ, payloadType, path);
try (InputStream inputStream = payloadStorage.download(externalStorageLocation.getUri())) {
return objectMapper.readValue(inputStream, Map.class);
} catch (IOException e) {
String errorMsg = String.format("Unable to download payload frome external storage location: %s", path);
logger.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
}
}

private Builder getWebResourceBuilder(URI URI, Object entity) {
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
}

private void handleClientHandlerException(ClientHandlerException exception, URI uri){
private void handleClientHandlerException(ClientHandlerException exception, URI uri) {
String errorMessage = String.format("Unable to invoke Conductor API with uri: %s, failure to process request or response", uri);
logger.error(errorMessage, exception);
throw new ConductorClientException(errorMessage, exception);
Expand All @@ -221,7 +260,7 @@ private void handleUniformInterfaceException(UniformInterfaceException exception
}
String errorMessage = clientResponse.getEntity(String.class);
logger.error("Unable to invoke Conductor API with uri: {}, unexpected response from server: {}", uri, clientResponseToString(exception.getResponse()), exception);
ErrorResponse errorResponse = null;
ErrorResponse errorResponse;
try {
errorResponse = objectMapper.readValue(errorMessage, ErrorResponse.class);
} catch (IOException e) {
Expand All @@ -243,7 +282,7 @@ private void handleException(URI uri, RuntimeException e) {
if (e instanceof UniformInterfaceException) {
handleUniformInterfaceException(((UniformInterfaceException) e), uri);
} else if (e instanceof ClientHandlerException) {
handleClientHandlerException((ClientHandlerException)e, uri);
handleClientHandlerException((ClientHandlerException) e, uri);
} else {
handleRuntimeException(e, uri);
}
Expand Down Expand Up @@ -296,5 +335,4 @@ private UriBuilder getURIBuilder(String path, Object[] queryParams) {
}
return builder;
}

}
Loading

0 comments on commit 55c1c4c

Please sign in to comment.