Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3756 from saksham2105/http-task-stuck-fix
Browse files Browse the repository at this point in the history
issue-3719: Fix for http task stuck due to NPE
  • Loading branch information
v1r3n authored Sep 27, 2023
2 parents 22ad050 + 1735708 commit 5ed77fb
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,90 @@
*/
package com.netflix.conductor.core.storage;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is
* configured
*/
public class DummyPayloadStorage implements ExternalPayloadStorage {

private static final Logger LOGGER = LoggerFactory.getLogger(DummyPayloadStorage.class);

private ObjectMapper objectMapper;
private File payloadDir;

public DummyPayloadStorage() {
try {
this.objectMapper = new ObjectMapper();
this.payloadDir = Files.createTempDirectory("payloads").toFile();
LOGGER.info(
"{} initialized in directory: {}",
this.getClass().getSimpleName(),
payloadDir.getAbsolutePath());
} catch (IOException ioException) {
LOGGER.error(
"Exception encountered while creating payloads directory : {}",
ioException.getMessage());
}
}

@Override
public ExternalStorageLocation getLocation(
Operation operation, PayloadType payloadType, String path) {
return null;
ExternalStorageLocation location = new ExternalStorageLocation();
location.setPath(path + UUID.randomUUID() + ".json");
return location;
}

@Override
public void upload(String path, InputStream payload, long payloadSize) {}
public void upload(String path, InputStream payload, long payloadSize) {
File file = new File(payloadDir, path);
String filePath = file.getAbsolutePath();
try {
if (!file.exists() && file.createNewFile()) {
LOGGER.debug("Created file: {}", filePath);
}
IOUtils.copy(payload, new FileOutputStream(file));
LOGGER.debug("Written to {}", filePath);
} catch (IOException e) {
// just handle this exception here and return empty map so that test will fail in case
// this exception is thrown
LOGGER.error("Error writing to {}", filePath);
} finally {
try {
if (payload != null) {
payload.close();
}
} catch (IOException e) {
LOGGER.warn("Unable to close input stream when writing to file");
}
}
}

@Override
public InputStream download(String path) {
return null;
try {
LOGGER.debug("Reading from {}", path);
return new FileInputStream(new File(payloadDir, path));
} catch (IOException e) {
LOGGER.error("Error reading {}", path, e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.storage;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.junit.Before;
import org.junit.Test;

import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;

import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class DummyPayloadStorageTest {

private DummyPayloadStorage dummyPayloadStorage;

private static final String TEST_STORAGE_PATH = "test-storage";

private ExternalStorageLocation location;

private ObjectMapper objectMapper;

public static final String MOCK_PAYLOAD = "{\n" + "\"output\": \"TEST_OUTPUT\",\n" + "}\n";

@Before
public void setup() {
dummyPayloadStorage = new DummyPayloadStorage();
objectMapper = new ObjectMapper();
location =
dummyPayloadStorage.getLocation(
ExternalPayloadStorage.Operation.WRITE,
PayloadType.TASK_OUTPUT,
TEST_STORAGE_PATH);
try {
byte[] payloadBytes = MOCK_PAYLOAD.getBytes("UTF-8");
dummyPayloadStorage.upload(
location.getPath(),
new ByteArrayInputStream(payloadBytes),
payloadBytes.length);
} catch (UnsupportedEncodingException unsupportedEncodingException) {
}
}

@Test
public void testGetLocationNotNull() {
assertNotNull(location);
}

@Test
public void testDownloadForValidPath() {
try (InputStream inputStream = dummyPayloadStorage.download(location.getPath())) {
Map<String, Object> payload =
objectMapper.readValue(
IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class);
assertTrue(payload.containsKey("output"));
assertEquals(payload.get("output"), "TEST_OUTPUT");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
}

@Test
public void testDownloadForInvalidPath() {
InputStream inputStream = dummyPayloadStorage.download("testPath");
assertNull(inputStream);
}
}

0 comments on commit 5ed77fb

Please sign in to comment.