Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,6 +92,8 @@ public void kill(List<DataSegment> segments) throws SegmentLoadingException

// create a map of bucket to keys to delete
Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = new HashMap<>();
boolean shouldThrowException = false;

for (DataSegment segment : segments) {
String s3Bucket = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.BUCKET);
String path = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.KEY);
Expand All @@ -104,19 +103,16 @@ public void kill(List<DataSegment> segments) throws SegmentLoadingException
);
if (path.endsWith("/")) {
// segment is not compressed, list objects and add them all to delete list
final ListObjectsV2Result list = s3Client.listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(path)
);
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(objectSummary.getKey()));
boolean hadException = deleteSegmentFilesFromS3(s3Bucket, path);
if (hadException) {
shouldThrowException = true;
}
} else {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
}
}

boolean shouldThrowException = false;
for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys : bucketToKeysToDelete.entrySet()) {
String s3Bucket = bucketToKeys.getKey();
List<DeleteObjectsRequest.KeyVersion> keysToDelete = bucketToKeys.getValue();
Expand Down Expand Up @@ -212,6 +208,24 @@ private boolean deleteKeysForBucket(
return hadException;
}

private boolean deleteSegmentFilesFromS3(String s3Bucket, String s3Path)
{
try {
S3Utils.deleteObjectsInPath(
s3ClientSupplier.get(),
inputDataConfig.getMaxListingLength(),
s3Bucket,
s3Path,
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage());
return true;
}
return false;
}

@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
Expand All @@ -223,12 +237,17 @@ public void kill(DataSegment segment) throws SegmentLoadingException

if (s3Path.endsWith("/")) {
// segment is not compressed, list objects and delete them all
final ListObjectsV2Result list = s3Client.listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(s3Path)
);
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, objectSummary.getKey());
s3Client.deleteObject(s3Bucket, objectSummary.getKey());
try {
S3Utils.deleteObjectsInPath(
s3ClientSupplier.get(),
inputDataConfig.getMaxListingLength(),
s3Bucket,
s3Path,
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage());
}
} else {
String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
Expand All @@ -39,7 +38,6 @@
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.LogicalOperator;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -481,132 +479,153 @@ public void test_kill_listOfSegments_unexpectedExceptionIsThrown()
@Test
public void test_kill_not_zipped() throws SegmentLoadingException
{
ListObjectsV2Result list = EasyMock.createMock(ListObjectsV2Result.class);
S3ObjectSummary objectSummary = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary.getKey()).andReturn(KEY_1 + "/meta.smoosh").anyTimes();
S3ObjectSummary objectSummary2 = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary2.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary2.getKey()).andReturn(KEY_1 + "/00000.smoosh").anyTimes();
EasyMock.expect(list.getObjectSummaries()).andReturn(List.of(objectSummary, objectSummary2)).once();
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject())).andReturn(list).once();
s3Client.deleteObject(TEST_BUCKET, KEY_1 + "/00000.smoosh");
EasyMock.expectLastCall().andVoid();
s3Client.deleteObject(TEST_BUCKET, KEY_1 + "/meta.smoosh");
EasyMock.expectLastCall().andVoid();
URI segment1Uri = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, KEY_1 + "/"));
S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1 + "/00000.smoosh", TIME_0);
S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1 + "/meta.smoosh", TIME_0);

S3TestUtils.expectListObjects(s3Client, segment1Uri, ImmutableList.of(objectSummary1, objectSummary2));

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig, objectSummary, objectSummary2, list);
// With MAX_KEYS=1, deleteObjectsInPath batches deletes one key at a time, so two deleteObjects calls
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(new DeleteObjectsRequest.KeyVersion(KEY_1 + "/00000.smoosh")));
DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(new DeleteObjectsRequest.KeyVersion(KEY_1 + "/meta.smoosh")));

S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
ImmutableList.of(deleteRequest1, deleteRequest2),
ImmutableMap.of()
);

EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS).anyTimes();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(List.of(DATA_SEGMENT_1_NO_ZIP));
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig, objectSummary, objectSummary2, list);
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}

@Test
public void test_kill_not_zipped_multi() throws SegmentLoadingException
{
ListObjectsV2Result list = EasyMock.createMock(ListObjectsV2Result.class);
S3ObjectSummary objectSummary11 = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary11.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary11.getKey()).andReturn(KEY_1 + "/meta.smoosh").anyTimes();
S3ObjectSummary objectSummary12 = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary12.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary12.getKey()).andReturn(KEY_1 + "/00000.smoosh").anyTimes();
EasyMock.expect(list.getObjectSummaries()).andReturn(List.of(objectSummary11, objectSummary12)).once();

ListObjectsV2Result list2 = EasyMock.createMock(ListObjectsV2Result.class);
S3ObjectSummary objectSummary21 = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary21.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary21.getKey()).andReturn(KEY_2 + "/meta.smoosh").anyTimes();
S3ObjectSummary objectSummary22 = EasyMock.createMock(S3ObjectSummary.class);
EasyMock.expect(objectSummary22.getBucketName()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(objectSummary22.getKey()).andReturn(KEY_2 + "/00000.smoosh").anyTimes();
EasyMock.expect(list2.getObjectSummaries()).andReturn(List.of(objectSummary21, objectSummary22)).once();
EasyMock.expect(
s3Client.listObjectsV2(
EasyMock.cmp(
new ListObjectsV2Request().withBucketName(TEST_BUCKET)
.withPrefix(KEY_1 + "/"),
(o1, o2) -> {
if (!o1.getBucketName().equals(o2.getBucketName())) {
return o1.getBucketName().compareTo(o2.getBucketName());
}
return o1.getPrefix().compareTo(o2.getPrefix());
},
LogicalOperator.EQUAL
)
)
).andReturn(list).once();
EasyMock.expect(
s3Client.listObjectsV2(
EasyMock.cmp(
new ListObjectsV2Request().withBucketName(TEST_BUCKET)
.withPrefix(KEY_2 + "/"),
(o1, o2) -> {
if (!o1.getBucketName().equals(o2.getBucketName())) {
return o1.getBucketName().compareTo(o2.getBucketName());
}
return o1.getPrefix().compareTo(o2.getPrefix());
},
LogicalOperator.EQUAL
)
)
).andReturn(list2).once();
URI segment1Uri = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, KEY_1 + "/"));
S3ObjectSummary objectSummary11 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1 + "/00000.smoosh", TIME_0);
S3ObjectSummary objectSummary12 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1 + "/meta.smoosh", TIME_0);

DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH);
// One listObjectsV2 call per segment; no second (empty) page since first result has truncated=false
S3TestUtils.expectListObjects(s3Client, segment1Uri, ImmutableList.of(objectSummary11, objectSummary12));

s3Client.deleteObjects(EasyMock.cmp(
new DeleteObjectsRequest(TEST_BUCKET).withKeys(
KEY_1 + "/00000.smoosh",
KEY_1 + "/meta.smoosh",
KEY_2 + "/00000.smoosh",
KEY_2 + "/meta.smoosh"
),
(o1, o2) -> {
if (!o1.getBucketName().equals(o2.getBucketName())) {
return o1.getBucketName().compareTo(o2.getBucketName());
}

for (DeleteObjectsRequest.KeyVersion key : o1.getKeys()) {
boolean found = false;
for (DeleteObjectsRequest.KeyVersion key2 : o2.getKeys()) {
if (key.getKey().equals(key2.getKey())) {
found = true;
}
}
if (!found) {
return -1;
}
}
return 0;
},
LogicalOperator.EQUAL
));
EasyMock.expectLastCall().andVoid().once();

EasyMock.replay(
URI segment2Uri = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, KEY_2 + "/"));
S3ObjectSummary objectSummary21 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2 + "/00000.smoosh", TIME_1);
S3ObjectSummary objectSummary22 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2 + "/meta.smoosh", TIME_1);
S3TestUtils.expectListObjects(s3Client, segment2Uri, ImmutableList.of(objectSummary21, objectSummary22));


DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1 + "/00000.smoosh")
));
DeleteObjectsRequest deleteRequest1Meta = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_1 + "/meta.smoosh")
));
DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_2 + "/00000.smoosh")
));
DeleteObjectsRequest deleteRequest2Meta = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(ImmutableList.of(
new DeleteObjectsRequest.KeyVersion(KEY_2 + "/meta.smoosh")
));

S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
segmentPusherConfig,
inputDataConfig,
objectSummary11,
objectSummary12,
list,
objectSummary21,
objectSummary22,
list2
ImmutableList.of(deleteRequest1, deleteRequest1Meta, deleteRequest2, deleteRequest2Meta),
ImmutableMap.of()
);

EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET).anyTimes();
EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS).anyTimes();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(List.of(DATA_SEGMENT_1_NO_ZIP, DATA_SEGMENT_2_NO_ZIP));
EasyMock.verify(
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}

/**
* Verifies that when a segment directory has more than one page of objects (S3 returns truncated list),
* the killer paginates through all pages and deletes every object from deep storage.
*/
@Test
public void test_kill_not_zipped_pagination_deletesAllObjectsFromDeepStorage() throws SegmentLoadingException
{
final int firstPageSize = 1000;
final int secondPageSize = 100;

URI segment1Uri = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, KEY_1 + "/"));

// Create two pages of object summaries to simulate pagination
ImmutableList.Builder<S3ObjectSummary> firstPageBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteObjectsRequest.KeyVersion> allKeysBuilder = ImmutableList.builder();

for (int i = 0; i < firstPageSize; i++) {
String key = KEY_1 + "/file_" + i;
firstPageBuilder.add(S3TestUtils.newS3ObjectSummary(TEST_BUCKET, key, TIME_0));
allKeysBuilder.add(new DeleteObjectsRequest.KeyVersion(key));
}

for (int i = 0; i < secondPageSize; i++) {
String key = KEY_1 + "/file_" + (firstPageSize + i);
allKeysBuilder.add(new DeleteObjectsRequest.KeyVersion(key));
}

// Mock listObjectsV2 to return results with pagination
ListObjectsV2Result firstPage = new ListObjectsV2Result();
firstPage.getObjectSummaries().addAll(firstPageBuilder.build());
firstPage.setTruncated(true);
firstPage.setNextContinuationToken("token");

ImmutableList.Builder<S3ObjectSummary> secondPageBuilder = ImmutableList.builder();
for (int i = 0; i < secondPageSize; i++) {
String key = KEY_1 + "/file_" + (firstPageSize + i);
secondPageBuilder.add(S3TestUtils.newS3ObjectSummary(TEST_BUCKET, key, TIME_0));
}

ListObjectsV2Result secondPage = new ListObjectsV2Result();
secondPage.getObjectSummaries().addAll(secondPageBuilder.build());
secondPage.setTruncated(false);

EasyMock.expect(s3Client.listObjectsV2(S3TestUtils.matchListObjectsRequest(segment1Uri)))
.andReturn(firstPage)
.once();
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject()))
.andReturn(secondPage)
.once();

// Mock the delete operation - S3 batches deletes in chunks of 1000, so expect two delete calls
ImmutableList<DeleteObjectsRequest.KeyVersion> allKeys = allKeysBuilder.build();
DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(allKeys.subList(0, 1000));
DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET)
.withKeys(allKeys.subList(1000, 1100));

S3TestUtils.mockS3ClientDeleteObjects(
s3Client,
segmentPusherConfig,
inputDataConfig,
objectSummary11,
objectSummary12,
list,
objectSummary21,
objectSummary22,
list2
ImmutableList.of(deleteRequest1, deleteRequest2),
ImmutableMap.of()
);

EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(S3InputDataConfig.MAX_LISTING_LENGTH_MAX).anyTimes();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);

segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(DATA_SEGMENT_1_NO_ZIP);

EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}
}