From ba3c5b9a33cdbbe694845b75e350ad1316e42094 Mon Sep 17 00:00:00 2001
From: jxu-nflx <90653611+jxu-nflx@users.noreply.github.com>
Date: Mon, 15 Aug 2022 10:37:33 -0700
Subject: [PATCH] revert to c89e78081d6ff7e80740be35fb2a11e69a1b7a47 (#3172)
---
.../exception/RequestHandlerException.java | 46 --
.../conductor/client/http/ClientBase.java | 438 ++++++++----------
.../client/http/ClientRequestHandler.java | 87 ++++
.../conductor/client/http/EventClient.java | 76 ++-
.../conductor/client/http/MetadataClient.java | 68 ++-
.../conductor/client/http/PayloadStorage.java | 185 ++++++++
.../conductor/client/http/RequestHandler.java | 33 --
.../conductor/client/http/TaskClient.java | 96 +++-
.../conductor/client/http/WorkflowClient.java | 91 +++-
.../http/jersey/JerseyRequestHandler.java | 236 ----------
.../client/http/ClientSpecification.groovy | 16 +-
.../client/http/EventClientSpec.groovy | 14 +-
.../client/http/MetadataClientSpec.groovy | 16 +-
.../client/http/TaskClientSpec.groovy | 26 +-
.../client/http/WorkflowClientSpec.groovy | 22 +-
.../automator/TaskRunnerConfigurerTest.java | 21 +-
.../workflow/executor/WorkflowExecutor.java | 11 +-
17 files changed, 765 insertions(+), 717 deletions(-)
delete mode 100644 client/src/main/java/com/netflix/conductor/client/exception/RequestHandlerException.java
create mode 100644 client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java
create mode 100644 client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java
delete mode 100644 client/src/main/java/com/netflix/conductor/client/http/RequestHandler.java
delete mode 100644 client/src/main/java/com/netflix/conductor/client/http/jersey/JerseyRequestHandler.java
diff --git a/client/src/main/java/com/netflix/conductor/client/exception/RequestHandlerException.java b/client/src/main/java/com/netflix/conductor/client/exception/RequestHandlerException.java
deleted file mode 100644
index 7553d499cf..0000000000
--- a/client/src/main/java/com/netflix/conductor/client/exception/RequestHandlerException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2022 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package com.netflix.conductor.client.exception;
-
-import java.io.InputStream;
-
-public class RequestHandlerException extends RuntimeException {
-
- private InputStream response;
- private int status;
-
- public RequestHandlerException(InputStream response, int status) {
- this.response = response;
- this.status = status;
- }
-
- public RequestHandlerException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public RequestHandlerException(String message) {
- super(message);
- }
-
- public InputStream getResponse() {
- return response;
- }
-
- public int getStatus() {
- return status;
- }
-
- public boolean hasResponse() {
- return response != null;
- }
-}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
index 603697751b..a27da869d0 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
@@ -12,26 +12,17 @@
*/
package com.netflix.conductor.client.http;
-import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
-import java.util.Optional;
+import java.util.function.Function;
-import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,35 +30,38 @@
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.client.exception.ConductorClientException;
-import com.netflix.conductor.client.exception.RequestHandlerException;
-import com.netflix.conductor.client.http.jersey.JerseyRequestHandler;
import com.netflix.conductor.common.config.ObjectMapperProvider;
+import com.netflix.conductor.common.model.BulkResponse;
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.common.validation.ErrorResponse;
import com.fasterxml.jackson.core.Version;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource.Builder;
/** Abstract client for the REST server */
public abstract class ClientBase {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientBase.class);
- private final RequestHandler requestHandler;
+ protected ClientRequestHandler requestHandler;
- private String root = "";
+ protected String root = "";
- protected final ObjectMapper objectMapper;
+ protected ObjectMapper objectMapper;
- private final PayloadStorage payloadStorage;
+ protected PayloadStorage payloadStorage;
- protected final ConductorClientConfiguration conductorClientConfiguration;
+ protected ConductorClientConfiguration conductorClientConfiguration;
protected ClientBase(
- RequestHandler requestHandler, ConductorClientConfiguration clientConfiguration) {
+ ClientRequestHandler requestHandler, ConductorClientConfiguration clientConfiguration) {
this.objectMapper = new ObjectMapperProvider().getObjectMapper();
// https://github.com/FasterXML/jackson-databind/issues/2683
@@ -75,13 +69,11 @@ protected ClientBase(
objectMapper.registerModule(new JavaTimeModule());
}
- // we do not want to use defaultIfNull here since creation of JerseyRequestHandler requires
- // classes that may not be in the classpath
- this.requestHandler = requestHandler != null ? requestHandler : new JerseyRequestHandler();
+ this.requestHandler = requestHandler;
this.conductorClientConfiguration =
ObjectUtils.defaultIfNull(
clientConfiguration, new DefaultConductorClientConfiguration());
- this.payloadStorage = new PayloadStorage();
+ this.payloadStorage = new PayloadStorage(this);
}
public void setRootURI(String root) {
@@ -89,34 +81,51 @@ public void setRootURI(String root) {
}
protected void delete(String url, Object... uriVariables) {
- deleteWithUriVariables(url, null, uriVariables);
+ deleteWithUriVariables(null, url, uriVariables);
}
protected void deleteWithUriVariables(
- String url, Object[] queryParams, Object... uriVariables) {
- URI uri = getURIBuilder(getFullUrl(url), queryParams).build(uriVariables);
+ Object[] queryParams, String url, Object... uriVariables) {
+ delete(queryParams, url, uriVariables, null);
+ }
+
+ protected BulkResponse deleteWithRequestBody(Object[] queryParams, String url, Object body) {
+ return delete(queryParams, url, null, body);
+ }
+
+ private BulkResponse delete(
+ Object[] queryParams, String url, Object[] uriVariables, Object body) {
+ URI uri = null;
+ BulkResponse response = null;
try {
- requestHandler.delete(uri);
- } catch (RequestHandlerException rhe) {
- throw createClientException(rhe);
+ uri = getURIBuilder(root + url, queryParams).build(uriVariables);
+ response = requestHandler.delete(uri, body);
+ } catch (UniformInterfaceException e) {
+ handleUniformInterfaceException(e, uri);
+ } catch (RuntimeException e) {
+ handleRuntimeException(e, uri);
}
+ return response;
}
protected void put(String url, Object[] queryParams, Object request, Object... uriVariables) {
- URI uri = getURIBuilder(getFullUrl(url), queryParams).build(uriVariables);
+ URI uri = null;
try {
- requestHandler.put(uri, request);
- } catch (RequestHandlerException rhe) {
- throw createClientException(rhe);
+ uri = getURIBuilder(root + url, queryParams).build(uriVariables);
+ requestHandler.getWebResourceBuilder(uri, request).put();
+ } catch (RuntimeException e) {
+ handleException(uri, e);
}
}
- protected void post(String url, Object request) {
- postForEntity(url, request, null, null);
+ protected void postForEntityWithRequestOnly(String url, Object request) {
+ Class> type = null;
+ postForEntity(url, request, null, type);
}
- protected void postWithUriVariables(String url, Object... uriVariables) {
- postForEntity(url, null, null, null, uriVariables);
+ protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) {
+ Class> type = null;
+ postForEntity(url, null, null, type, uriVariables);
}
protected T postForEntity(
@@ -125,56 +134,98 @@ protected T postForEntity(
Object[] queryParams,
Class responseType,
Object... uriVariables) {
- URI uri = getURIBuilder(getFullUrl(url), queryParams).build(uriVariables);
+ return postForEntity(
+ url,
+ request,
+ queryParams,
+ responseType,
+ builder -> builder.post(responseType),
+ uriVariables);
+ }
+
+ protected T postForEntity(
+ String url,
+ Object request,
+ Object[] queryParams,
+ GenericType responseType,
+ Object... uriVariables) {
+ return postForEntity(
+ url,
+ request,
+ queryParams,
+ responseType,
+ builder -> builder.post(responseType),
+ uriVariables);
+ }
+ private T postForEntity(
+ String url,
+ Object request,
+ Object[] queryParams,
+ Object responseType,
+ Function postWithEntity,
+ Object... uriVariables) {
+ URI uri = null;
try {
- InputStream response = requestHandler.post(uri, request);
+ uri = getURIBuilder(root + url, queryParams).build(uriVariables);
+ Builder webResourceBuilder = requestHandler.getWebResourceBuilder(uri, request);
if (responseType == null) {
+ webResourceBuilder.post();
return null;
}
- return convertToType(response, responseType);
- } catch (RequestHandlerException rhe) {
- throw createClientException(rhe);
- }
- }
-
- protected String postForString(
- String url, Object request, Object[] queryParams, Object... uriVariables) {
- URI uri = getURIBuilder(getFullUrl(url), queryParams).build(uriVariables);
- try {
- InputStream response = requestHandler.post(uri, request);
- return convertToString(response);
- } catch (RequestHandlerException rhe) {
- throw createClientException(rhe);
+ return postWithEntity.apply(webResourceBuilder);
+ } catch (UniformInterfaceException e) {
+ handleUniformInterfaceException(e, uri);
+ } catch (RuntimeException e) {
+ handleRuntimeException(e, uri);
}
+ return null;
}
protected T getForEntity(
String url, Object[] queryParams, Class responseType, Object... uriVariables) {
- return getForEntity(url, queryParams, uriVariables)
- .map(inputStream -> convertToType(inputStream, responseType))
- .orElse(null);
+ return getForEntity(
+ url, queryParams, response -> response.getEntity(responseType), uriVariables);
}
protected T getForEntity(
+ String url, Object[] queryParams, GenericType responseType, Object... uriVariables) {
+ return getForEntity(
+ url, queryParams, response -> response.getEntity(responseType), uriVariables);
+ }
+
+ private T getForEntity(
String url,
Object[] queryParams,
- TypeReference responseType,
+ Function entityProvider,
Object... uriVariables) {
- return getForEntity(url, queryParams, uriVariables)
- .map(inputStream -> convertToType(inputStream, responseType))
- .orElse(null);
+ URI uri = null;
+ ClientResponse clientResponse;
+ try {
+ uri = getURIBuilder(root + url, queryParams).build(uriVariables);
+ clientResponse = requestHandler.get(uri);
+ if (clientResponse.getStatus() < 300) {
+ return entityProvider.apply(clientResponse);
+ } else {
+ throw new UniformInterfaceException(clientResponse);
+ }
+ } catch (UniformInterfaceException e) {
+ handleUniformInterfaceException(e, uri);
+ } catch (RuntimeException e) {
+ handleRuntimeException(e, uri);
+ }
+ return null;
}
/**
* Uses the {@link PayloadStorage} for storing large payloads. Gets the uri for storing the
- * payload from the server and then uploads to this location.
+ * payload from the server and then uploads to this location
*
* @param payloadType the {@link
- * com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be uploaded.
- * @param payloadBytes the byte array containing the payload.
- * @param payloadSize the size of the payload.
- * @return the path where the payload is stored in external storage.
+ * com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be uploaded
+ * @param payloadBytes the byte array containing the payload
+ * @param payloadSize the size of the payload
+ * @return the path where the payload is stored in external storage
*/
protected String uploadToExternalPayloadStorage(
ExternalPayloadStorage.PayloadType payloadType, byte[] payloadBytes, long payloadSize) {
@@ -218,13 +269,10 @@ protected Map downloadFromExternalStorage(
}
}
- private String getFullUrl(String url) {
- return root + url;
- }
-
private UriBuilder getURIBuilder(String path, Object[] queryParams) {
- path = StringUtils.trimToEmpty(path);
-
+ if (path == null) {
+ path = "";
+ }
UriBuilder builder = UriBuilder.fromPath(path);
if (queryParams != null) {
for (int i = 0; i < queryParams.length; i += 2) {
@@ -248,204 +296,92 @@ protected boolean isNewerJacksonVersion() {
return version.getMajorVersion() == 2 && version.getMinorVersion() >= 12;
}
- private Optional getForEntity(
- String url, Object[] queryParams, Object... uriVariables) {
- URI uri = getURIBuilder(getFullUrl(url), queryParams).build(uriVariables);
- try {
- return Optional.ofNullable(requestHandler.get(uri));
- } catch (RequestHandlerException rhe) {
- throw createClientException(rhe);
- }
+ private void handleClientHandlerException(ClientHandlerException exception, URI uri) {
+ String errorMessage =
+ String.format(
+ "Unable to invoke Conductor API with uri: %s, failure to process request or response",
+ uri);
+ LOGGER.error(errorMessage, exception);
+ throw new ConductorClientException(errorMessage, exception);
}
- private ConductorClientException createClientException(RequestHandlerException rhe) {
- if (rhe.hasResponse()) {
- ErrorResponse errorResponse = convertToType(rhe.getResponse(), ErrorResponse.class);
- if (errorResponse != null) {
- return new ConductorClientException(rhe.getStatus(), errorResponse);
- }
- }
-
- return new ConductorClientException(rhe.getMessage(), rhe.getCause());
+ private void handleRuntimeException(RuntimeException exception, URI uri) {
+ String errorMessage =
+ String.format(
+ "Unable to invoke Conductor API with uri: %s, runtime exception occurred",
+ uri);
+ LOGGER.error(errorMessage, exception);
+ throw new ConductorClientException(errorMessage, exception);
}
- private String convertToString(InputStream inputStream) {
+ private void handleUniformInterfaceException(UniformInterfaceException exception, URI uri) {
+ ClientResponse clientResponse = exception.getResponse();
+ if (clientResponse == null) {
+ throw new ConductorClientException(
+ String.format("Unable to invoke Conductor API with uri: %s", uri));
+ }
try {
- return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw new ConductorClientException("Error converting response to String", e);
- } finally {
+ if (clientResponse.getStatus() < 300) {
+ return;
+ }
+ String errorMessage = clientResponse.getEntity(String.class);
+ LOGGER.warn(
+ "Unable to invoke Conductor API with uri: {}, unexpected response from server: statusCode={}, responseBody='{}'.",
+ uri,
+ clientResponse.getStatus(),
+ errorMessage);
+ ErrorResponse errorResponse;
try {
- inputStream.close();
+ errorResponse = objectMapper.readValue(errorMessage, ErrorResponse.class);
} catch (IOException e) {
- LOGGER.error("Error closing input stream", e);
+ throw new ConductorClientException(clientResponse.getStatus(), errorMessage);
}
+ throw new ConductorClientException(clientResponse.getStatus(), errorResponse);
+ } catch (ConductorClientException e) {
+ throw e;
+ } catch (ClientHandlerException e) {
+ handleClientHandlerException(e, uri);
+ } catch (RuntimeException e) {
+ handleRuntimeException(e, uri);
+ } finally {
+ clientResponse.close();
}
}
- private T convertToType(InputStream inputStream, Class responseType) {
- try {
- String value = convertToString(inputStream);
- return StringUtils.isNotBlank(value)
- ? objectMapper.readValue(value, responseType)
- : null;
- } catch (IOException e) {
- throw new ConductorClientException("Error converting response to " + responseType, e);
- }
- }
-
- private T convertToType(InputStream inputStream, TypeReference responseType) {
- try {
- String value = convertToString(inputStream);
- return StringUtils.isNotBlank(value)
- ? objectMapper.readValue(value, responseType)
- : null;
- } catch (IOException e) {
- throw new ConductorClientException("Error converting response to " + responseType, e);
+ private void handleException(URI uri, RuntimeException e) {
+ if (e instanceof UniformInterfaceException) {
+ handleUniformInterfaceException(((UniformInterfaceException) e), uri);
+ } else if (e instanceof ClientHandlerException) {
+ handleClientHandlerException((ClientHandlerException) e, uri);
+ } else {
+ handleRuntimeException(e, uri);
}
}
- /** An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data. */
- class PayloadStorage implements ExternalPayloadStorage {
-
- /**
- * This method is not intended to be used in the client. The client makes a request to the
- * server to get the {@link ExternalStorageLocation}
- */
- @Override
- public ExternalStorageLocation getLocation(
- Operation operation, PayloadType payloadType, String path) {
- String url;
- switch (payloadType) {
- case WORKFLOW_INPUT:
- case WORKFLOW_OUTPUT:
- url = "workflow";
- break;
- case TASK_INPUT:
- case TASK_OUTPUT:
- url = "tasks";
- break;
- default:
- throw new ConductorClientException(
- String.format(
- "Invalid payload type: %s for operation: %s",
- payloadType, operation.toString()));
- }
- return getForEntity(
- url + "/externalstoragelocation",
- new Object[] {
- "path",
- path,
- "operation",
- operation.toString(),
- "payloadType",
- payloadType.toString()
- },
- ExternalStorageLocation.class);
- }
-
- /**
- * Uploads the payload to the uri specified.
- *
- * @param uri the location to which the object is to be uploaded
- * @param payload an {@link InputStream} containing the json payload which is to be uploaded
- * @param payloadSize the size of the json payload in bytes
- * @throws ConductorClientException if the upload fails due to an invalid path or an error
- * from external storage
- */
- @Override
- public void upload(String uri, InputStream payload, long payloadSize) {
- HttpURLConnection connection = null;
- try {
- URL url = new URI(uri).toURL();
-
- connection = (HttpURLConnection) url.openConnection();
- connection.setDoOutput(true);
- connection.setRequestMethod("PUT");
-
- try (BufferedOutputStream bufferedOutputStream =
- new BufferedOutputStream(connection.getOutputStream())) {
- long count = IOUtils.copy(payload, bufferedOutputStream);
- bufferedOutputStream.flush();
- // Check the HTTP response code
- int responseCode = connection.getResponseCode();
- if (Response.Status.fromStatusCode(responseCode).getFamily()
- != Response.Status.Family.SUCCESSFUL) {
- String errorMsg =
- String.format("Unable to upload. Response code: %d", responseCode);
- LOGGER.error(errorMsg);
- throw new ConductorClientException(errorMsg);
- }
- LOGGER.debug(
- "Uploaded {} bytes to uri: {}, with HTTP response code: {}",
- count,
- uri,
- responseCode);
- }
- } catch (URISyntaxException | MalformedURLException e) {
- String errorMsg = String.format("Invalid path specified: %s", uri);
- LOGGER.error(errorMsg, e);
- throw new ConductorClientException(errorMsg, e);
- } catch (IOException e) {
- String errorMsg = String.format("Error uploading to path: %s", uri);
- LOGGER.error(errorMsg, e);
- throw new ConductorClientException(errorMsg, e);
- } finally {
- if (connection != null) {
- connection.disconnect();
- }
- try {
- if (payload != null) {
- payload.close();
- }
- } catch (IOException e) {
- LOGGER.warn("Unable to close inputstream when uploading to uri: {}", uri);
- }
- }
+ /**
+ * Converts ClientResponse object to string with detailed debug information including status
+ * code, media type, response headers, and response body if exists.
+ */
+ private String clientResponseToString(ClientResponse response) {
+ if (response == null) {
+ return null;
}
-
- /**
- * Downloads the payload from the given uri.
- *
- * @param uri the location from where the object is to be downloaded
- * @return an inputstream of the payload in the external storage
- * @throws ConductorClientException if the download fails due to an invalid path or an error
- * from external storage
- */
- @Override
- public InputStream download(String uri) {
- HttpURLConnection connection = null;
- String errorMsg;
+ StringBuilder builder = new StringBuilder();
+ builder.append("[status: ").append(response.getStatus());
+ builder.append(", media type: ").append(response.getType());
+ if (response.getStatus() != 404) {
try {
- URL url = new URI(uri).toURL();
- connection = (HttpURLConnection) url.openConnection();
- connection.setDoOutput(false);
-
- // Check the HTTP response code
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- LOGGER.debug(
- "Download completed with HTTP response code: {}",
- connection.getResponseCode());
- return org.apache.commons.io.IOUtils.toBufferedInputStream(
- connection.getInputStream());
- }
- errorMsg = String.format("Unable to download. Response code: %d", responseCode);
- LOGGER.error(errorMsg);
- throw new ConductorClientException(errorMsg);
- } catch (URISyntaxException | MalformedURLException e) {
- errorMsg = String.format("Invalid uri specified: %s", uri);
- LOGGER.error(errorMsg, e);
- throw new ConductorClientException(errorMsg, e);
- } catch (IOException e) {
- errorMsg = String.format("Error downloading from uri: %s", uri);
- LOGGER.error(errorMsg, e);
- throw new ConductorClientException(errorMsg, e);
- } finally {
- if (connection != null) {
- connection.disconnect();
+ String responseBody = response.getEntity(String.class);
+ if (responseBody != null) {
+ builder.append(", response body: ").append(responseBody);
}
+ } catch (RuntimeException ignore) {
+ // Ignore if there is no response body, or IO error - it may have already been read
+ // in certain scenario.
}
}
+ builder.append(", response headers: ").append(response.getHeaders());
+ builder.append("]");
+ return builder.toString();
}
}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java
new file mode 100644
index 0000000000..38749c1c55
--- /dev/null
+++ b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2022 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.net.URI;
+
+import javax.ws.rs.core.MediaType;
+
+import com.netflix.conductor.common.config.ObjectMapperProvider;
+import com.netflix.conductor.common.model.BulkResponse;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandler;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+
+public class ClientRequestHandler {
+ private final Client client;
+
+ public ClientRequestHandler(
+ ClientConfig config, ClientHandler handler, ClientFilter... filters) {
+ ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper();
+
+ // https://github.com/FasterXML/jackson-databind/issues/2683
+ if (isNewerJacksonVersion()) {
+ objectMapper.registerModule(new JavaTimeModule());
+ }
+
+ JacksonJsonProvider provider = new JacksonJsonProvider(objectMapper);
+ config.getSingletons().add(provider);
+
+ if (handler == null) {
+ this.client = Client.create(config);
+ } else {
+ this.client = new Client(handler, config);
+ }
+
+ for (ClientFilter filter : filters) {
+ this.client.addFilter(filter);
+ }
+ }
+
+ public BulkResponse delete(URI uri, Object body) {
+ if (body != null) {
+ return client.resource(uri)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .delete(BulkResponse.class, body);
+ } else {
+ client.resource(uri).delete();
+ }
+ return null;
+ }
+
+ public ClientResponse get(URI uri) {
+ return client.resource(uri)
+ .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)
+ .get(ClientResponse.class);
+ }
+
+ public WebResource.Builder getWebResourceBuilder(URI URI, Object entity) {
+ return client.resource(URI)
+ .type(MediaType.APPLICATION_JSON)
+ .entity(entity)
+ .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
+ }
+
+ private boolean isNewerJacksonVersion() {
+ Version version = com.fasterxml.jackson.databind.cfg.PackageVersion.VERSION;
+ return version.getMajorVersion() == 2 && version.getMinorVersion() >= 12;
+ }
+}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/EventClient.java b/client/src/main/java/com/netflix/conductor/client/http/EventClient.java
index eed33bace0..da494ae925 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/EventClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/EventClient.java
@@ -17,44 +17,84 @@
import org.apache.commons.lang3.Validate;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.common.metadata.events.EventHandler;
-import com.fasterxml.jackson.core.type.TypeReference;
+import com.sun.jersey.api.client.ClientHandler;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
// Client class for all Event Handler operations
public class EventClient extends ClientBase {
-
- private static final TypeReference> eventHandlerList =
- new TypeReference>() {};
-
+ private static final GenericType> eventHandlerList =
+ new GenericType>() {};
/** Creates a default metadata client */
public EventClient() {
- this(null);
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
+ }
+
+ /**
+ * @param clientConfig REST Client configuration
+ */
+ public EventClient(ClientConfig clientConfig) {
+ this(clientConfig, new DefaultConductorClientConfiguration(), null);
+ }
+
+ /**
+ * @param clientConfig REST Client configuration
+ * @param clientHandler Jersey client handler. Useful when plugging in various http client
+ * interaction modules (e.g. ribbon)
+ */
+ public EventClient(ClientConfig clientConfig, ClientHandler clientHandler) {
+ this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler);
}
- public EventClient(RequestHandler requestHandler) {
- this(requestHandler, null);
+ /**
+ * @param config config REST Client configuration
+ * @param handler handler Jersey client handler. Useful when plugging in various http client
+ * interaction modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
+ public EventClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
+ this(config, new DefaultConductorClientConfiguration(), handler, filters);
}
+ /**
+ * @param config REST Client configuration
+ * @param clientConfiguration Specific properties configured for the client, see {@link
+ * ConductorClientConfiguration}
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction
+ * modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
public EventClient(
- RequestHandler requestHandler, ConductorClientConfiguration clientConfiguration) {
- super(requestHandler, clientConfiguration);
+ ClientConfig config,
+ ConductorClientConfiguration clientConfiguration,
+ ClientHandler handler,
+ ClientFilter... filters) {
+ super(new ClientRequestHandler(config, handler, filters), clientConfiguration);
+ }
+
+ EventClient(ClientRequestHandler requestHandler) {
+ super(requestHandler, null);
}
/**
- * Register an event handler with the server.
+ * Register an event handler with the server
*
- * @param eventHandler the eventHandler definition.
+ * @param eventHandler the eventHandler definition
*/
public void registerEventHandler(EventHandler eventHandler) {
Validate.notNull(eventHandler, "Event Handler definition cannot be null");
- post("event", eventHandler);
+ postForEntityWithRequestOnly("event", eventHandler);
}
/**
- * Updates an event handler with the server.
+ * Updates an event handler with the server
*
- * @param eventHandler the eventHandler definition.
+ * @param eventHandler the eventHandler definition
*/
public void updateEventHandler(EventHandler eventHandler) {
Validate.notNull(eventHandler, "Event Handler definition cannot be null");
@@ -62,9 +102,9 @@ public void updateEventHandler(EventHandler eventHandler) {
}
/**
- * @param event name of the event.
- * @param activeOnly if true, returns only the active handlers.
- * @return Returns the list of all the event handlers for a given event.
+ * @param event name of the event
+ * @param activeOnly if true, returns only the active handlers
+ * @return Returns the list of all the event handlers for a given event
*/
public List getEventHandlers(String event, boolean activeOnly) {
Validate.notBlank(event, "Event cannot be blank");
diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
index b141af6ac0..1ea4efebaf 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
@@ -17,45 +17,83 @@
import org.apache.commons.lang3.Validate;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
+import com.sun.jersey.api.client.ClientHandler;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+
public class MetadataClient extends ClientBase {
/** Creates a default metadata client */
public MetadataClient() {
- this(null);
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
+ }
+
+ /**
+ * @param clientConfig REST Client configuration
+ */
+ public MetadataClient(ClientConfig clientConfig) {
+ this(clientConfig, new DefaultConductorClientConfiguration(), null);
}
- public MetadataClient(RequestHandler requestHandler) {
- this(requestHandler, null);
+ /**
+ * @param clientConfig REST Client configuration
+ * @param clientHandler Jersey client handler. Useful when plugging in various http client
+ * interaction modules (e.g. ribbon)
+ */
+ public MetadataClient(ClientConfig clientConfig, ClientHandler clientHandler) {
+ this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler);
}
+ /**
+ * @param config config REST Client configuration
+ * @param handler handler Jersey client handler. Useful when plugging in various http client
+ * interaction modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
+ public MetadataClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
+ this(config, new DefaultConductorClientConfiguration(), handler, filters);
+ }
+
+ /**
+ * @param config REST Client configuration
+ * @param clientConfiguration Specific properties configured for the client, see {@link
+ * ConductorClientConfiguration}
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction
+ * modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
public MetadataClient(
- RequestHandler requestHandler, ConductorClientConfiguration clientConfiguration) {
- super(requestHandler, clientConfiguration);
+ ClientConfig config,
+ ConductorClientConfiguration clientConfiguration,
+ ClientHandler handler,
+ ClientFilter... filters) {
+ super(new ClientRequestHandler(config, handler, filters), clientConfiguration);
+ }
+
+ MetadataClient(ClientRequestHandler requestHandler) {
+ super(requestHandler, null);
}
// Workflow Metadata Operations
/**
- * Register a workflow definition with the server.
+ * Register a workflow definition with the server
*
* @param workflowDef the workflow definition
*/
public void registerWorkflowDef(WorkflowDef workflowDef) {
Validate.notNull(workflowDef, "Workflow definition cannot be null");
- post("metadata/workflow", workflowDef);
+ postForEntityWithRequestOnly("metadata/workflow", workflowDef);
}
- /**
- * Validates a workflow definition with the server.
- *
- * @param workflowDef the workflow definition
- */
public void validateWorkflowDef(WorkflowDef workflowDef) {
Validate.notNull(workflowDef, "Workflow definition cannot be null");
- post("metadata/workflow/validate", workflowDef);
+ postForEntityWithRequestOnly("metadata/workflow/validate", workflowDef);
}
/**
@@ -64,7 +102,7 @@ public void validateWorkflowDef(WorkflowDef workflowDef) {
* @param workflowDefs List of workflow definitions to be updated
*/
public void updateWorkflowDefs(List workflowDefs) {
- Validate.notNull(workflowDefs, "Worfklow defs list cannot be null");
+ Validate.notNull(workflowDefs, "Workflow defs list cannot be null");
put("metadata/workflow", null, workflowDefs);
}
@@ -106,7 +144,7 @@ public void unregisterWorkflowDef(String name, Integer version) {
*/
public void registerTaskDefs(List taskDefs) {
Validate.notNull(taskDefs, "Task defs list cannot be null");
- post("metadata/taskdefs", taskDefs);
+ postForEntityWithRequestOnly("metadata/taskdefs", taskDefs);
}
/**
diff --git a/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java
new file mode 100644
index 0000000000..0b05745f0f
--- /dev/null
+++ b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2020 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.ws.rs.core.Response;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.client.exception.ConductorClientException;
+import com.netflix.conductor.common.run.ExternalStorageLocation;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
+
+import com.amazonaws.util.IOUtils;
+
+/** An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data. */
+class PayloadStorage implements ExternalPayloadStorage {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PayloadStorage.class);
+
+ private final ClientBase clientBase;
+
+ PayloadStorage(ClientBase clientBase) {
+ this.clientBase = clientBase;
+ }
+
+ /**
+ * This method is not intended to be used in the client. The client makes a request to the
+ * server to get the {@link ExternalStorageLocation}
+ */
+ @Override
+ public ExternalStorageLocation getLocation(
+ Operation operation, PayloadType payloadType, String path) {
+ String uri;
+ switch (payloadType) {
+ case WORKFLOW_INPUT:
+ case WORKFLOW_OUTPUT:
+ uri = "workflow";
+ break;
+ case TASK_INPUT:
+ case TASK_OUTPUT:
+ uri = "tasks";
+ break;
+ default:
+ throw new ConductorClientException(
+ String.format(
+ "Invalid payload type: %s for operation: %s",
+ payloadType.toString(), operation.toString()));
+ }
+ return clientBase.getForEntity(
+ String.format("%s/externalstoragelocation", uri),
+ new Object[] {
+ "path",
+ path,
+ "operation",
+ operation.toString(),
+ "payloadType",
+ payloadType.toString()
+ },
+ ExternalStorageLocation.class);
+ }
+
+ /**
+ * Uploads the payload to the uri specified.
+ *
+ * @param uri the location to which the object is to be uploaded
+ * @param payload an {@link InputStream} containing the json payload which is to be uploaded
+ * @param payloadSize the size of the json payload in bytes
+ * @throws ConductorClientException if the upload fails due to an invalid path or an error from
+ * external storage
+ */
+ @Override
+ public void upload(String uri, InputStream payload, long payloadSize) {
+ HttpURLConnection connection = null;
+ try {
+ URL url = new URI(uri).toURL();
+
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+ connection.setRequestMethod("PUT");
+
+ try (BufferedOutputStream bufferedOutputStream =
+ new BufferedOutputStream(connection.getOutputStream())) {
+ long count = IOUtils.copy(payload, bufferedOutputStream);
+ bufferedOutputStream.flush();
+ // Check the HTTP response code
+ int responseCode = connection.getResponseCode();
+ if (Response.Status.fromStatusCode(responseCode).getFamily()
+ != Response.Status.Family.SUCCESSFUL) {
+ String errorMsg =
+ String.format("Unable to upload. Response code: %d", responseCode);
+ LOGGER.error(errorMsg);
+ throw new ConductorClientException(errorMsg);
+ }
+ LOGGER.debug(
+ "Uploaded {} bytes to uri: {}, with HTTP response code: {}",
+ count,
+ uri,
+ responseCode);
+ }
+ } catch (URISyntaxException | MalformedURLException e) {
+ String errorMsg = String.format("Invalid path specified: %s", uri);
+ LOGGER.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } catch (IOException e) {
+ String errorMsg = String.format("Error uploading to path: %s", uri);
+ LOGGER.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ try {
+ if (payload != null) {
+ payload.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Unable to close inputstream when uploading to uri: {}", uri);
+ }
+ }
+ }
+
+ /**
+ * Downloads the payload from the given uri.
+ *
+ * @param uri the location from where the object is to be downloaded
+ * @return an inputstream of the payload in the external storage
+ * @throws ConductorClientException if the download fails due to an invalid path or an error
+ * from external storage
+ */
+ @Override
+ public InputStream download(String uri) {
+ HttpURLConnection connection = null;
+ String errorMsg;
+ try {
+ URL url = new URI(uri).toURL();
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(false);
+
+ // Check the HTTP response code
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ LOGGER.debug(
+ "Download completed with HTTP response code: {}",
+ connection.getResponseCode());
+ return org.apache.commons.io.IOUtils.toBufferedInputStream(
+ connection.getInputStream());
+ }
+ errorMsg = String.format("Unable to download. Response code: %d", responseCode);
+ LOGGER.error(errorMsg);
+ throw new ConductorClientException(errorMsg);
+ } catch (URISyntaxException | MalformedURLException e) {
+ errorMsg = String.format("Invalid uri specified: %s", uri);
+ LOGGER.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } catch (IOException e) {
+ errorMsg = String.format("Error downloading from uri: %s", uri);
+ LOGGER.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/RequestHandler.java b/client/src/main/java/com/netflix/conductor/client/http/RequestHandler.java
deleted file mode 100644
index 4e79bb2de0..0000000000
--- a/client/src/main/java/com/netflix/conductor/client/http/RequestHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2022 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package com.netflix.conductor.client.http;
-
-import java.io.InputStream;
-import java.net.URI;
-
-import org.jetbrains.annotations.Nullable;
-
-import com.netflix.conductor.client.exception.RequestHandlerException;
-
-public interface RequestHandler {
- void delete(URI uri) throws RequestHandlerException;
-
- @Nullable
- InputStream put(URI uri, Object body) throws RequestHandlerException;
-
- @Nullable
- InputStream post(URI uri, Object body) throws RequestHandlerException;
-
- @Nullable
- InputStream get(URI uri) throws RequestHandlerException;
-}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
index 40c7528e67..d624ef0f6a 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.client.exception.ConductorClientException;
import com.netflix.conductor.client.telemetry.MetricsContainer;
import com.netflix.conductor.common.metadata.tasks.PollData;
@@ -37,42 +38,83 @@
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;
-import com.fasterxml.jackson.core.type.TypeReference;
+import com.sun.jersey.api.client.ClientHandler;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
/** Client for conductor task management including polling for task, updating task status etc. */
public class TaskClient extends ClientBase {
- private static final TypeReference> taskList = new TypeReference>() {};
+ private static final GenericType> taskList = new GenericType>() {};
- private static final TypeReference> taskExecLogList =
- new TypeReference>() {};
+ private static final GenericType> taskExecLogList =
+ new GenericType>() {};
- private static final TypeReference> pollDataList =
- new TypeReference>() {};
+ private static final GenericType> pollDataList =
+ new GenericType>() {};
- private static final TypeReference> searchResultTaskSummary =
- new TypeReference>() {};
+ private static final GenericType> searchResultTaskSummary =
+ new GenericType>() {};
- private static final TypeReference> searchResultTask =
- new TypeReference>() {};
+ private static final GenericType> searchResultTask =
+ new GenericType>() {};
- private static final TypeReference