Skip to content

Commit

Permalink
Updated integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
Elwizzy12 committed Aug 12, 2024
1 parent 999ac84 commit f315dd1
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 228 deletions.
26 changes: 1 addition & 25 deletions datasafe-business/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,12 @@
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>2.26.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>2.26.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
<version>2.26.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>2.26.22</version>
<scope>test</scope>

</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.26.22</version>
<scope>test</scope>
<scope>runtime</scope>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,13 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.security.UnrecoverableKeyException;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -118,13 +114,12 @@ static void initDistributedMinios() {
log.info("ENDPOINT IS {}", endpoint);
endpointsByHostNoBucket.put(it, endpoint);

S3Client client = S3Client.builder()
.endpointOverride(URI.create(endpoint))
.region(Region.of(REGION))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey(it), secretKey(it))
))
.build();
S3Client client = S3ClientFactory.getClient(
endpoint,
REGION,
accessKey(it),
secretKey(it)
);

AwsClientRetry.createBucketWithRetry(client, it);
});
Expand Down Expand Up @@ -297,19 +292,17 @@ private void registerUser(UserIDAuth auth) {
}

private List<String> listInBucket(String bucket) {
S3Client client = S3Client.builder()
.endpointOverride(URI.create(endpointsByHostNoBucket.get(bucket)))
.region(Region.of(REGION))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey(bucket), secretKey(bucket))
))
.build();

S3Client s3 = S3ClientFactory.getClient(
endpointsByHostNoBucket.get(bucket),
REGION,
accessKey(bucket),
secretKey(bucket)
);
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucket)
.build();
ListObjectsV2Response response = s3.listObjectsV2(request);

ListObjectsV2Response response = client.listObjectsV2(request);
return response.contents()
.stream()
.map(S3Object::key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ private static StorageService httpS3() {

private static StorageService amazonS3() {
return new UriBasedAuthStorageService(
acc -> getStorageService(
acc.getAccessKey(),
acc.getSecretKey(),
acc.getEndpoint(),
acc.getRegion(),
acc.getBucketName()
acc -> new S3StorageService(
S3ClientFactory.getAmazonClient(
acc.getRegion(),
acc.getAccessKey(),
acc.getSecretKey()
),
// Bucket name is encoded in first path segment
acc.getBucketName(),
ExecutorServiceUtil.submitterExecutesOnStarvationExecutingService()
),
uri -> (uri.getHost() + "/" + uri.getPath().replaceFirst("^/", "")).split("/")
);
Expand All @@ -103,13 +106,15 @@ private WithCredentialProvider(Lazy<StorageKeyStoreOperations> storageKeyStoreOp

private static S3StorageService getStorageService(String accessKey, String secretKey, String url, String region,
String bucket) {
AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
S3Client s3 = S3Client.builder()
.endpointOverride(URI.create(url))
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(creds))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, secretKey)
))
.build();


return new S3StorageService(
s3,
bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
},
{
"pattern": "lib/commons-logging-1.1.3.jar"
},
}
,
{
"pattern": "lib/dagger-2.17.jar"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,10 @@ public class BasicS3Factory implements S3Factory {

@Override
public S3Client getClient(String endpointUrl, String region, String accessKey, String secretKey) {
return S3Client.builder()
.endpointOverride(URI.create(endpointUrl))
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)))
.build();
return S3ClientFactory.getClient(endpointUrl, region, accessKey, secretKey);
}
@Override
public S3Client getAmazonClient(String region, String accessKey, String secretKey) {
return S3Client.builder()
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)))
.build();
return S3ClientFactory.getAmazonClient(region, accessKey, secretKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ datasafe.dbPassword=${MYSQL_PASSWORD}

spring.liquibase.enabled=false


datasafe.encryption.keystore.type=BCFKS
datasafe.encryption.keystore.encryptionAlgo=AES256_KWP
datasafe.encryption.keystore.pbkdf.scrypt.cost=16384
Expand Down
10 changes: 5 additions & 5 deletions datasafe-storage/datasafe-storage-impl-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
<artifactId>s3</artifactId>
<version>2.26.22</version>
</dependency>





<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>2.26.31</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@
import de.adorsys.datasafe.types.api.callback.ResourceWriteCallback;
import de.adorsys.datasafe.types.api.utils.CustomizableByteArrayOutputStream;
import de.adorsys.datasafe.types.api.utils.Obfuscate;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
Expand All @@ -53,7 +52,6 @@ public class MultipartUploadS3StorageOutputStream extends OutputStream {

private S3Client s3;

private String multiPartUploadId;

// The minimum size for a multi part request is 5 MB, hence the buffer size of 5 MB
static final int BUFFER_SIZE = 1024 * 1024 * 5;
Expand All @@ -70,14 +68,13 @@ public class MultipartUploadS3StorageOutputStream extends OutputStream {
private final List<? extends ResourceWriteCallback> callbacks;

public MultipartUploadS3StorageOutputStream(String bucketName, String objectKey, S3Client s3,
ExecutorService executorService, String multiPartUploadId,
ExecutorService executorService,
List<? extends ResourceWriteCallback> callbacks) {
this.bucketName = bucketName;
this.objectName = objectKey;
this.s3 = s3;
this.completionService = new ExecutorCompletionService<>(executorService);
this.callbacks = callbacks;
this.multiPartUploadId = multiPartUploadId;

log.debug("Write to bucket: {} with name: {}", Obfuscate.secure(bucketName), Obfuscate.secure(objectName));
}
Expand Down Expand Up @@ -140,7 +137,7 @@ private void initiateMultipartRequestAndCommitPartIfNeeded() {
.contentSize(size)
.bucketName(bucketName)
.objectName(objectName)
.uploadId(multiPartUploadId)
.uploadId(multiPartUploadResult.uploadId())
.chunkNumberCounter(partCounter)
.lastChunk(false)
.build()
Expand All @@ -155,22 +152,26 @@ private boolean isMultiPartUpload() {

@SneakyThrows
private void finishSimpleUpload() {
// ObjectMetadata objectMetadata = ObjectMetadata.builder()
// .contentLength(currentOutputStream.size())
// .build();
int size = currentOutputStream.size();
byte[] content = currentOutputStream.getBufferOrCopy();

// Release the memory
currentOutputStream = null;

PutObjectResponse upload = s3.putObject(
PutObjectRequest.builder()
.bucket(bucketName)
.key(objectName)
.contentLength((long) content.length)
.build(),
RequestBody.fromBytes(content));
notifyCommittedVersionIfPresent(upload.versionId());
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(objectName)
.contentLength((long) size)
.build();
try {
s3.putObject(putObjectRequest, RequestBody.fromInputStream(new ByteArrayInputStream(content, 0, size), size));
} catch (Exception e) {
log.error("Failed to put object to S3: {}", e.getMessage(), e);
throw e;
}


notifyCommittedVersionIfPresent(null);

log.debug("Finished simple upload");
}
Expand All @@ -179,18 +180,21 @@ private void finishMultiPartUpload() throws IOException {
sendLastChunkOfMultipartIfNeeded();

try {
List<CompletedPart> partETags = getMultiPartsUploadResults();
List<CompletedPart> completedParts = getMultiPartsUploadResults();

CompleteMultipartUploadResponse upload = s3.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
log.debug("Send multipart request to S3");
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectName)
.uploadId(multiPartUploadId)
.uploadId(multiPartUploadResult.uploadId())
.multipartUpload(CompletedMultipartUpload.builder()
.parts(partETags)
.parts(completedParts)
.build())
.build());
.build();

CompleteMultipartUploadResponse uploadResponse = s3.completeMultipartUpload(completeRequest);

notifyCommittedVersionIfPresent(upload.versionId());
notifyCommittedVersionIfPresent(uploadResponse.eTag());

log.debug("Finished multi part upload");
} catch (ExecutionException e) {
Expand Down Expand Up @@ -226,7 +230,7 @@ private void sendLastChunkOfMultipartIfNeeded() {
.contentSize(size)
.bucketName(bucketName)
.objectName(objectName)
.uploadId(multiPartUploadId)
.uploadId(multiPartUploadResult.uploadId())
.chunkNumberCounter(partCounter)
.lastChunk(true)
.build()
Expand All @@ -248,45 +252,49 @@ private void initiateMultiPartIfNeeded() {
if (multiPartUploadResult == null) {

log.debug("Initiate multi part");
CreateMultipartUploadResponse multiPartUploadResult = s3.createMultipartUpload(
CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectName)
.build()
);
this.multiPartUploadResult = multiPartUploadResult;
this.multiPartUploadId = multiPartUploadResult.uploadId();
CreateMultipartUploadRequest initiateRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectName)
.build();

try {
multiPartUploadResult = s3.createMultipartUpload(initiateRequest);
} catch (S3Exception e) {
log.error("Failed to initiate multipart upload", e);
// Handle the exception as needed
throw e; // or handle accordingly
}
}
}

private void abortMultiPartUpload() {
log.debug("Abort multi part");

if (isMultiPartUpload()) {
s3.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectName)
.uploadId(multiPartUploadId)
.build()
);
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.key(objectName)
.uploadId(multiPartUploadResult.uploadId())
.build();

s3.abortMultipartUpload(abortRequest);
}
}

private List<CompletedPart> getMultiPartsUploadResults() throws ExecutionException, InterruptedException {
List<CompletedPart> result = new ArrayList<>(partCounter);
for (int i = 0; i < partCounter; i++) {
UploadPartResponse partResponse = completionService.take().get();
int partNumber = i + 1; // Part numbers start from 1
for (int i = 1; i <= partCounter; i++) {
UploadPartResponse completedPart = completionService.take().get();
result.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(partResponse.eTag())
.partNumber(i)
.eTag(completedPart.eTag())
.build());

log.debug("Get upload part #{} from {}", i, partCounter);
}
return result;
}

private CustomizableByteArrayOutputStream newOutputStream() {
return new CustomizableByteArrayOutputStream(32, BUFFER_SIZE, 0.5);
}
Expand Down
Loading

0 comments on commit f315dd1

Please sign in to comment.