diff --git a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java index 47c94c761f..a04a43710b 100644 --- a/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java +++ b/core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java @@ -12,28 +12,90 @@ */ package com.netflix.conductor.core.storage; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.utils.ExternalPayloadStorage; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is * configured */ public class DummyPayloadStorage implements ExternalPayloadStorage { + private static final Logger LOGGER = LoggerFactory.getLogger(DummyPayloadStorage.class); + + private ObjectMapper objectMapper; + private File payloadDir; + + public DummyPayloadStorage() { + try { + this.objectMapper = new ObjectMapper(); + this.payloadDir = Files.createTempDirectory("payloads").toFile(); + LOGGER.info( + "{} initialized in directory: {}", + this.getClass().getSimpleName(), + payloadDir.getAbsolutePath()); + } catch (IOException ioException) { + LOGGER.error( + "Exception encountered while creating payloads directory : {}", + ioException.getMessage()); + } + } + @Override public ExternalStorageLocation getLocation( Operation operation, PayloadType payloadType, String path) { - return null; + ExternalStorageLocation location = new ExternalStorageLocation(); + location.setPath(path + UUID.randomUUID() + ".json"); + return location; } @Override - public void upload(String path, InputStream payload, long payloadSize) {} + public void upload(String path, InputStream payload, long payloadSize) { + File file = new File(payloadDir, path); + String filePath = file.getAbsolutePath(); + try { + if (!file.exists() && file.createNewFile()) { + LOGGER.debug("Created file: {}", filePath); + } + IOUtils.copy(payload, new FileOutputStream(file)); + LOGGER.debug("Written to {}", filePath); + } catch (IOException e) { + // just handle this exception here and return empty map so that test will fail in case + // this exception is thrown + LOGGER.error("Error writing to {}", filePath); + } finally { + try { + if (payload != null) { + payload.close(); + } + } catch (IOException e) { + LOGGER.warn("Unable to close input stream when writing to file"); + } + } + } @Override public InputStream download(String path) { - return null; + try { + LOGGER.debug("Reading from {}", path); + return new FileInputStream(new File(payloadDir, path)); + } catch (IOException e) { + LOGGER.error("Error reading {}", path, e); + return null; + } } } diff --git a/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java new file mode 100644 index 0000000000..e5fd2c04d2 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/storage/DummyPayloadStorageTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.storage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +import com.netflix.conductor.common.run.ExternalStorageLocation; +import com.netflix.conductor.common.utils.ExternalPayloadStorage; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class DummyPayloadStorageTest { + + private DummyPayloadStorage dummyPayloadStorage; + + private static final String TEST_STORAGE_PATH = "test-storage"; + + private ExternalStorageLocation location; + + private ObjectMapper objectMapper; + + public static final String MOCK_PAYLOAD = "{\n" + "\"output\": \"TEST_OUTPUT\",\n" + "}\n"; + + @Before + public void setup() { + dummyPayloadStorage = new DummyPayloadStorage(); + objectMapper = new ObjectMapper(); + location = + dummyPayloadStorage.getLocation( + ExternalPayloadStorage.Operation.WRITE, + PayloadType.TASK_OUTPUT, + TEST_STORAGE_PATH); + try { + byte[] payloadBytes = MOCK_PAYLOAD.getBytes("UTF-8"); + dummyPayloadStorage.upload( + location.getPath(), + new ByteArrayInputStream(payloadBytes), + payloadBytes.length); + } catch (UnsupportedEncodingException unsupportedEncodingException) { + } + } + + @Test + public void testGetLocationNotNull() { + assertNotNull(location); + } + + @Test + public void testDownloadForValidPath() { + try (InputStream inputStream = dummyPayloadStorage.download(location.getPath())) { + Map payload = + objectMapper.readValue( + IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class); + assertTrue(payload.containsKey("output")); + assertEquals(payload.get("output"), "TEST_OUTPUT"); + } catch (Exception e) { + assertTrue(e instanceof IOException); + } + } + + @Test + public void testDownloadForInvalidPath() { + InputStream inputStream = dummyPayloadStorage.download("testPath"); + assertNull(inputStream); + } +}