Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit 4e993d3

Browse files
committed
issue-3719: Added disk based approach to prevent oom
1 parent 46dd3df commit 4e993d3

File tree

1 file changed

+51
-21
lines changed

1 file changed

+51
-21
lines changed

core/src/main/java/com/netflix/conductor/core/storage/DummyPayloadStorage.java

+51-21
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,23 @@
1212
*/
1313
package com.netflix.conductor.core.storage;
1414

15-
import java.io.ByteArrayInputStream;
15+
import java.io.File;
16+
import java.io.FileInputStream;
17+
import java.io.FileOutputStream;
18+
import java.io.IOException;
1619
import java.io.InputStream;
17-
import java.util.Map;
18-
import java.util.concurrent.ConcurrentHashMap;
20+
import java.nio.file.Files;
21+
import java.util.UUID;
1922

23+
import org.apache.commons.io.IOUtils;
2024
import org.slf4j.Logger;
2125
import org.slf4j.LoggerFactory;
2226

2327
import com.netflix.conductor.common.run.ExternalStorageLocation;
2428
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
2529

30+
import com.fasterxml.jackson.databind.ObjectMapper;
31+
2632
/**
2733
* A dummy implementation of {@link ExternalPayloadStorage} used when no external payload is
2834
* configured
@@ -31,40 +37,64 @@ public class DummyPayloadStorage implements ExternalPayloadStorage {
3137

3238
private static final Logger LOGGER = LoggerFactory.getLogger(DummyPayloadStorage.class);
3339

34-
private final Map<String, byte[]> dummyDataStore = new ConcurrentHashMap<>();
40+
private ObjectMapper objectMapper;
41+
private File payloadDir;
3542

36-
private static final String DUMMY_DATA_STORE_KEY = "DUMMY_PAYLOAD_STORE_KEY";
43+
public DummyPayloadStorage() {
44+
try {
45+
this.objectMapper = new ObjectMapper();
46+
this.payloadDir = Files.createTempDirectory("payloads").toFile();
47+
LOGGER.info(
48+
"{} initialized in directory: {}",
49+
this.getClass().getSimpleName(),
50+
payloadDir.getAbsolutePath());
51+
} catch (IOException ioException) {
52+
LOGGER.error(
53+
"Exception encountered while creating payloads directory : {}",
54+
ioException.getMessage());
55+
}
56+
}
3757

3858
@Override
3959
public ExternalStorageLocation getLocation(
4060
Operation operation, PayloadType payloadType, String path) {
41-
ExternalStorageLocation externalStorageLocation = new ExternalStorageLocation();
42-
externalStorageLocation.setPath(path != null ? path : "");
43-
return externalStorageLocation;
61+
ExternalStorageLocation location = new ExternalStorageLocation();
62+
location.setPath(path + UUID.randomUUID() + ".json");
63+
return location;
4464
}
4565

4666
@Override
4767
public void upload(String path, InputStream payload, long payloadSize) {
68+
File file = new File(payloadDir, path);
69+
String filePath = file.getAbsolutePath();
4870
try {
49-
final byte[] payloadBytes = new byte[(int) payloadSize];
50-
final int bytesRead = payload.read(new byte[(int) payloadSize]);
51-
52-
if (bytesRead > 0) {
53-
dummyDataStore.put(
54-
path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path, payloadBytes);
71+
if (!file.exists() && file.createNewFile()) {
72+
LOGGER.debug("Created file: {}", filePath);
73+
}
74+
IOUtils.copy(payload, new FileOutputStream(file));
75+
LOGGER.debug("Written to {}", filePath);
76+
} catch (IOException e) {
77+
// just handle this exception here and return empty map so that test will fail in case
78+
// this exception is thrown
79+
LOGGER.error("Error writing to {}", filePath);
80+
} finally {
81+
try {
82+
if (payload != null) {
83+
payload.close();
84+
}
85+
} catch (IOException e) {
86+
LOGGER.warn("Unable to close input stream when writing to file");
5587
}
56-
} catch (Exception e) {
57-
LOGGER.error("Error encountered while uploading payload {}", e.getMessage());
5888
}
5989
}
6090

6191
@Override
6292
public InputStream download(String path) {
63-
final byte[] data =
64-
dummyDataStore.get(path == null || path.isEmpty() ? DUMMY_DATA_STORE_KEY : path);
65-
if (data != null) {
66-
return new ByteArrayInputStream(data);
67-
} else {
93+
try {
94+
LOGGER.debug("Reading from {}", path);
95+
return new FileInputStream(new File(payloadDir, path));
96+
} catch (IOException e) {
97+
LOGGER.error("Error reading {}", path, e);
6898
return null;
6999
}
70100
}

0 commit comments

Comments
 (0)