Skip to content

Commit

Permalink
Add destroy method to SourcePage
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Nov 4, 2024
1 parent fc56a68 commit 409599d
Show file tree
Hide file tree
Showing 12 changed files with 364 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.ObjLongConsumer;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class InputChannels
Expand Down Expand Up @@ -65,6 +66,10 @@ public List<Integer> getInputChannels()
return Collections.unmodifiableList(Ints.asList(inputChannels));
}

/**
* Returns a new {@link SourcePage} that wraps the given {@code page} and exposes only the input channels.
* Destroying the returned page will also destroy the given {@code page}.
*/
public SourcePage getInputChannels(SourcePage page)
{
return new InputChannelsSourcePage(page, inputChannels, eagerlyLoad);
Expand All @@ -81,9 +86,9 @@ public String toString()
private static final class InputChannelsSourcePage
implements SourcePage
{
private final SourcePage sourcePage;
private final int[] channels;
private final Block[] blocks;
private SourcePage sourcePage;
private int[] channels;
private Block[] blocks;

private InputChannelsSourcePage(SourcePage sourcePage, int[] channels, @Nullable boolean[] eagerlyLoad)
{
Expand All @@ -106,24 +111,34 @@ private InputChannelsSourcePage(SourcePage sourcePage, int[] channels, @Nullable
@Override
public int getPositionCount()
{
checkState(sourcePage != null, "page is destroyed");
return sourcePage.getPositionCount();
}

@Override
public long getSizeInBytes()
{
if (sourcePage == null) {
return 0;
}
return sourcePage.getSizeInBytes();
}

@Override
public long getRetainedSizeInBytes()
{
if (sourcePage == null) {
return 0;
}
return sourcePage.getRetainedSizeInBytes();
}

@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
if (sourcePage == null) {
return;
}
for (Block block : blocks) {
if (block != null) {
block.retainedBytesForEachPart(consumer);
Expand All @@ -134,12 +149,14 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
@Override
public int getChannelCount()
{
checkState(sourcePage != null, "page is destroyed");
return blocks.length;
}

@Override
public Block getBlock(int channel)
{
checkState(sourcePage != null, "page is destroyed");
Block block = blocks[channel];
if (block == null) {
block = sourcePage.getBlock(channels[channel]);
Expand All @@ -151,6 +168,7 @@ public Block getBlock(int channel)
@Override
public Page getPage()
{
checkState(sourcePage != null, "page is destroyed");
for (int i = 0; i < blocks.length; i++) {
getBlock(i);
}
Expand All @@ -160,6 +178,7 @@ public Page getPage()
@Override
public Page getColumns(int[] channels)
{
checkState(sourcePage != null, "page is destroyed");
Block[] blocks = new Block[channels.length];
for (int i = 0; i < channels.length; i++) {
blocks[i] = getBlock(channels[i]);
Expand All @@ -170,6 +189,7 @@ public Page getColumns(int[] channels)
@Override
public void selectPositions(int[] positions, int offset, int size)
{
checkState(sourcePage != null, "page is destroyed");
sourcePage.selectPositions(positions, offset, size);
for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
Expand All @@ -178,5 +198,22 @@ public void selectPositions(int[] positions, int offset, int size)
}
}
}

@Override
public void destroy()
{
if (sourcePage != null) {
sourcePage.destroy();
sourcePage = null;
}
blocks = null;
channels = null;
}

@Override
public boolean isDestroyed()
{
return sourcePage == null || sourcePage.isDestroyed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.function.ObjLongConsumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class TestingSourcePage
Expand All @@ -29,6 +30,7 @@ public class TestingSourcePage
private final int positionCount;
private final Block[] blocks;
private final boolean[] loaded;
private boolean destroyed;

public TestingSourcePage(int positionCount, Block... blocks)
{
Expand All @@ -40,12 +42,16 @@ public TestingSourcePage(int positionCount, Block... blocks)
@Override
public int getPositionCount()
{
checkState(!destroyed, "page is destroyed");
return positionCount;
}

@Override
public long getSizeInBytes()
{
if (destroyed) {
return 0;
}
long sizeInBytes = 0;
for (Block block : blocks) {
if (block != null) {
Expand All @@ -58,6 +64,9 @@ public long getSizeInBytes()
@Override
public long getRetainedSizeInBytes()
{
if (destroyed) {
return 0;
}
long retainedSizeInBytes = 0;
for (Block block : blocks) {
if (block != null) {
Expand All @@ -70,6 +79,9 @@ public long getRetainedSizeInBytes()
@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
if (destroyed) {
return;
}
for (Block block : blocks) {
if (block != null) {
block.retainedBytesForEachPart(consumer);
Expand All @@ -80,6 +92,7 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
@Override
public int getChannelCount()
{
checkState(!destroyed, "page is destroyed");
return blocks.length;
}

Expand All @@ -91,6 +104,7 @@ public boolean wasLoaded(int channel)
@Override
public Block getBlock(int channel)
{
checkState(!destroyed, "page is destroyed");
Block block = blocks[channel];
checkArgument(block != null, "Block %s should not be accessed", channel);
loaded[channel] = true;
Expand All @@ -100,6 +114,7 @@ public Block getBlock(int channel)
@Override
public Page getPage()
{
checkState(!destroyed, "page is destroyed");
for (Block block : blocks) {
checkArgument(block != null, "Page cannot be created because block is null");
}
Expand All @@ -111,11 +126,24 @@ public Page getPage()
@Override
public void selectPositions(int[] positions, int offset, int size)
{
checkState(!destroyed, "page is destroyed");
for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
if (block != null) {
blocks[i] = block.getPositions(positions, offset, size);
}
}
}

@Override
public void destroy()
{
destroyed = true;
}

@Override
public boolean isDestroyed()
{
return destroyed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ final class NoChannelsSourcePage
@Override
public int getPositionCount()
{
if (positionCount < 0) {
throw new IllegalStateException("page is destroyed");
}
return positionCount;
}

Expand All @@ -56,6 +59,9 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer) {}
@Override
public int getChannelCount()
{
if (positionCount < 0) {
throw new IllegalStateException("page is destroyed");
}
return 0;
}

Expand All @@ -68,12 +74,18 @@ public Block getBlock(int channel)
@Override
public Page getPage()
{
if (positionCount < 0) {
throw new IllegalStateException("page is destroyed");
}
return new Page(positionCount);
}

@Override
public void selectPositions(int[] positions, int offset, int size)
{
if (positionCount < 0) {
throw new IllegalStateException("page is destroyed");
}
if (size > positionCount) {
throw new IllegalArgumentException("Page has no channels");
}
Expand All @@ -83,4 +95,16 @@ public void selectPositions(int[] positions, int offset, int size)
}
positionCount = size;
}

@Override
public void destroy()
{
positionCount = -1;
}

@Override
public boolean isDestroyed()
{
return positionCount == -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,22 @@ static SourcePage create(Page page)

/**
* Gets the current loaded size of the page in bytes.
* <p>
* This method will return 0 if the page has been destroyed.
*/
long getSizeInBytes();

/**
* Gets the current retained size of the page in bytes.
* <p>
* This method is callable even if the page has been destroyed.
*/
long getRetainedSizeInBytes();

/**
* Calls retainedBytesForEachPart on all loaded blocks;
* <p>
* If the page has been destroyed, this method is a no-op.
*/
void retainedBytesForEachPart(ObjLongConsumer<Object> consumer);

Expand Down Expand Up @@ -107,4 +113,19 @@ default Page getColumns(int[] channels)
* the underlying reader to filter positions on subsequent reads.
*/
void selectPositions(int[] positions, int offset, int size);

/**
* Destroy this page and release any resources it holds.
* <p>
* This method should be called when the page is no longer needed.
* Once this method is called, the page should not be used, and all
* method calls on the page should throw an exception, unless noted otherwise.
* This method is idempotent and can be called multiple times.
*/
void destroy();

/**
* Returns true if this page has been destroyed.
*/
boolean isDestroyed();
}
Loading

0 comments on commit 409599d

Please sign in to comment.