From 878a5128c2c6606393f33bbb64947ff8192f9d1a Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Sat, 2 Sep 2023 03:30:27 +0530 Subject: [PATCH 1/5] issue-3719: Fix for http task stuck due to NPE --- .../core/storage/DummyPayloadStorage.java | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java index 47c94c761f..8ce2a3a215 100644 --- a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java +++ b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java @@ -12,10 +12,15 @@ */ package com.netflix.conductor.core.storage; +import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.utils.ExternalPayloadStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is @@ -23,17 +28,41 @@ */ public class DummyPayloadStorage implements ExternalPayloadStorage { + private static final Logger LOGGER = LoggerFactory.getLogger(DummyPayloadStorage.class); + + private final Map dummyDataStore = new ConcurrentHashMap<>(); + + private static final String DUMMY_DATA_STORE_KEY = "DUMMY_PAYLOAD_STORE_KEY"; + @Override public ExternalStorageLocation getLocation( Operation operation, PayloadType payloadType, String path) { - return null; + ExternalStorageLocation externalStorageLocation = new ExternalStorageLocation(); + externalStorageLocation.setPath(path != null ? path : ""); + return externalStorageLocation; } @Override - public void upload(String path, InputStream payload, long payloadSize) {} + public void upload(String path, InputStream payload, long payloadSize) { + try { + final byte[] payloadBytes = new byte[(int) payloadSize]; + final int bytesRead = payload.read(new byte[(int) payloadSize]); + + if (bytesRead > 0) { + dummyDataStore.put(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path, payloadBytes); + } + } catch (Exception e) { + LOGGER.error("Error encountered while uploading payload {}", e.getMessage()); + } + } @Override public InputStream download(String path) { - return null; + final byte[] data = dummyDataStore.get(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path); + if (data != null) { + return new ByteArrayInputStream(data); + } else { + return null; + } } } From 46dd3df90320912eddcae050738bc19a8ed9702e Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Sat, 2 Sep 2023 04:02:40 +0530 Subject: [PATCH 2/5] issue-3719: Fix format violations --- .../conductor/core/storage/DummyPayloadStorage.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java index 8ce2a3a215..2db4276012 100644 --- a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java +++ b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java @@ -17,11 +17,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import com.netflix.conductor.common.run.ExternalStorageLocation; -import com.netflix.conductor.common.utils.ExternalPayloadStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.conductor.common.run.ExternalStorageLocation; +import com.netflix.conductor.common.utils.ExternalPayloadStorage; + /** * A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is * configured @@ -49,7 +50,8 @@ public void upload(String path, InputStream payload, long payloadSize) { final int bytesRead = payload.read(new byte[(int) payloadSize]); if (bytesRead > 0) { - dummyDataStore.put(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path, payloadBytes); + dummyDataStore.put( + path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path, payloadBytes); } } catch (Exception e) { LOGGER.error("Error encountered while uploading payload {}", e.getMessage()); @@ -58,7 +60,8 @@ public void upload(String path, InputStream payload, long payloadSize) { @Override public InputStream download(String path) { - final byte[] data = dummyDataStore.get(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path); + final byte[] data = + dummyDataStore.get(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path); if (data != null) { return new ByteArrayInputStream(data); } else { From 4e993d354c16a8e375a3150af038b8b73b84cbd6 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Sat, 2 Sep 2023 13:44:08 +0530 Subject: [PATCH 3/5] issue-3719: Added disk based approach to prevent oom --- .../core/storage/DummyPayloadStorage.java | 72 +++++++++++++------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java index 2db4276012..a04a43710b 100644 --- a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java +++ b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java @@ -12,17 +12,23 @@ */ package com.netflix.conductor.core.storage; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.nio.file.Files; +import java.util.UUID; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.utils.ExternalPayloadStorage; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is * configured @@ -31,40 +37,64 @@ public class DummyPayloadStorage implements ExternalPayloadStorage { private static final Logger LOGGER = LoggerFactory.getLogger(DummyPayloadStorage.class); - private final Map dummyDataStore = new ConcurrentHashMap<>(); + private ObjectMapper objectMapper; + private File payloadDir; - private static final String DUMMY_DATA_STORE_KEY = "DUMMY_PAYLOAD_STORE_KEY"; + public DummyPayloadStorage() { + try { + this.objectMapper = new ObjectMapper(); + this.payloadDir = Files.createTempDirectory("payloads").toFile(); + LOGGER.info( + "{} initialized in directory: {}", + this.getClass().getSimpleName(), + payloadDir.getAbsolutePath()); + } catch (IOException ioException) { + LOGGER.error( + "Exception encountered while creating payloads directory : {}", + ioException.getMessage()); + } + } @Override public ExternalStorageLocation getLocation( Operation operation, PayloadType payloadType, String path) { - ExternalStorageLocation externalStorageLocation = new ExternalStorageLocation(); - externalStorageLocation.setPath(path != null ? path : ""); - return externalStorageLocation; + ExternalStorageLocation location = new ExternalStorageLocation(); + location.setPath(path + UUID.randomUUID() + ".json"); + return location; } @Override public void upload(String path, InputStream payload, long payloadSize) { + File file = new File(payloadDir, path); + String filePath = file.getAbsolutePath(); try { - final byte[] payloadBytes = new byte[(int) payloadSize]; - final int bytesRead = payload.read(new byte[(int) payloadSize]); - - if (bytesRead > 0) { - dummyDataStore.put( - path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path, payloadBytes); + if (!file.exists() && file.createNewFile()) { + LOGGER.debug("Created file: {}", filePath); + } + IOUtils.copy(payload, new FileOutputStream(file)); + LOGGER.debug("Written to {}", filePath); + } catch (IOException e) { + // just handle this exception here and return empty map so that test will fail in case + // this exception is thrown + LOGGER.error("Error writing to {}", filePath); + } finally { + try { + if (payload != null) { + payload.close(); + } + } catch (IOException e) { + LOGGER.warn("Unable to close input stream when writing to file"); } - } catch (Exception e) { - LOGGER.error("Error encountered while uploading payload {}", e.getMessage()); } } @Override public InputStream download(String path) { - final byte[] data = - dummyDataStore.get(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path); - if (data != null) { - return new ByteArrayInputStream(data); - } else { + try { + LOGGER.debug("Reading from {}", path); + return new FileInputStream(new File(payloadDir, path)); + } catch (IOException e) { + LOGGER.error("Error reading {}", path, e); return null; } } From 13a689d133a414e223e19fc3a7ee75a094a12d56 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Tue, 5 Sep 2023 18:18:23 +0530 Subject: [PATCH 4/5] issue-3719: Added unit testcases to verify download functionality --- .../core/storage/DummyPayloadStorageTest.java | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java diff --git a/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java new file mode 100644 index 0000000000..e2120b88d1 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.storage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +import com.netflix.conductor.common.run.ExternalStorageLocation; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; + +public class DummyPayloadStorageTest { + + private DummyPayloadStorage dummyPayloadStorage; + + private static final String TEST_STORAGE_PATH = "test-storage"; + + private ExternalStorageLocation location; + + private ObjectMapper objectMapper; + + public static final String MOCK_PAYLOAD = "{\n" + "\"output\": \"TEST_OUTPUT\",\n" + "}\n"; + + @Before + public void setup() { + dummyPayloadStorage = new DummyPayloadStorage(); + objectMapper = new ObjectMapper(); + location = + dummyPayloadStorage.getLocation(any(), PayloadType.TASK_OUTPUT, TEST_STORAGE_PATH); + try { + byte[] payloadBytes = MOCK_PAYLOAD.getBytes("UTF-8"); + dummyPayloadStorage.upload( + location.getPath(), + new ByteArrayInputStream(payloadBytes), + payloadBytes.length); + } catch (UnsupportedEncodingException unsupportedEncodingException) { + } + } + + @Test + public void testGetLocationNotNull() { + assertNotNull(location); + } + + @Test + public void testDownloadForValidPath() { + try (InputStream inputStream = dummyPayloadStorage.download(location.getPath())) { + Map payload = + objectMapper.readValue( + IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class); + assertTrue(payload.containsKey("output")); + assertEquals(payload.get("output"), "TEST_OUTPUT"); + } catch (Exception e) { + assertTrue(e instanceof IOException); + } + } + + @Test + public void testDownloadForInvalidPath() { + InputStream inputStream = dummyPayloadStorage.download("testPath"); + assertNull(inputStream); + } +} From 1735708367db2c65db40130484a07ac4b8eac996 Mon Sep 17 00:00:00 2001 From: saksham2105 Date: Fri, 8 Sep 2023 01:00:19 +0530 Subject: [PATCH 5/5] issue-3719: removed any from location in test setup --- .../conductor/core/storage/DummyPayloadStorageTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java index e2120b88d1..e5fd2c04d2 100644 --- a/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java +++ b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import com.netflix.conductor.common.run.ExternalStorageLocation; +import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,7 +34,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; public class DummyPayloadStorageTest { @@ -52,7 +52,10 @@ public void setup() { dummyPayloadStorage = new DummyPayloadStorage(); objectMapper = new ObjectMapper(); location = - dummyPayloadStorage.getLocation(any(), PayloadType.TASK_OUTPUT, TEST_STORAGE_PATH); + dummyPayloadStorage.getLocation( + ExternalPayloadStorage.Operation.WRITE, + PayloadType.TASK_OUTPUT, + TEST_STORAGE_PATH); try { byte[] payloadBytes = MOCK_PAYLOAD.getBytes("UTF-8"); dummyPayloadStorage.upload(