Skip to content

Commit

Permalink
Update some doc
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Nov 5, 2024
1 parent 369d5c6 commit b732084
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,12 @@ class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn]
}

/**
* Reader to coalesce columnar batches that are expected to contain only serialized
* tables T from shuffle. The serialized tables within are collected up to the target
* batch size and then concatenated on the host. Next try to send the concatenated
* result to GPU.
* Reader playing the same role as the combination of "HostCoalesceIteratorBase" and
* "GpuShuffleCoalesceIterator". That is to coalesce columnar batches expected to
* contain only serialized tables T from Shuffle. The serialized tables within are
* collected up to the target batch size and then concatenated them on the host.
* Next try to send the concatenated result to GPU.
*
* When OOM happens, it will reduce the target size by half, try to concatenate
* half of cached tables and send the result to GPU again.
*/
Expand Down Expand Up @@ -246,11 +248,12 @@ abstract class GpuShuffleCoalesceReaderBase[T <: AutoCloseable: ClassTag](
}

override def close(): Unit = if (!closed) {
serializedTables.foreach(_.close())
serializedTables.safeClose()
serializedTables.clear()
closed = true
}

/** Pull in batches from the input to make sure enough batches in the cache. */
private def pullNextBatch(): Boolean = {
if (closed) return false
// Always make sure enough data has been cached for the next batch.
Expand All @@ -273,6 +276,7 @@ abstract class GpuShuffleCoalesceReaderBase[T <: AutoCloseable: ClassTag](
serializedTables.nonEmpty
}

/** Collect batches that the total size is up to the given size from the cache. */
private def collectTablesForNextBatch(targetSize: Long): Array[T] = {
var curSize = 0L
var curRows = 0L
Expand Down Expand Up @@ -308,7 +312,7 @@ abstract class GpuShuffleCoalesceReaderBase[T <: AutoCloseable: ClassTag](
// 2) Concatenate the collected tables
// 3) Move the concatenated result to GPU
// We have to re-collect the tables and re-concatenate them, because the
// HostConcatResult can not be split into smaller pieces.
// coalesced result can not be split into smaller pieces.
val curTables = collectTablesForNextBatch(attemptSize.targetSize)
val concatHostBatch = withResource(new MetricRange(concatTimeMetric)) { _ =>
tableOperator.concatOnHost(curTables)
Expand All @@ -333,6 +337,11 @@ abstract class GpuShuffleCoalesceReaderBase[T <: AutoCloseable: ClassTag](
}
}

/**
* Prefetch the first bundle of serialized batches with the total size up to the
* "targetSize". The prefetched batches will be cached on host until the "next()"
* is called. This is for some optimization cases in joins.
*/
def prefetchHeadOnHost(): this.type = {
if (serializedTables.isEmpty) {
pullNextBatch()
Expand Down

0 comments on commit b732084

Please sign in to comment.