Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added cs
Binary file not shown.
19 changes: 16 additions & 3 deletions scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,22 @@ object SCollection {
* iterable is empty. For a version that accepts empty iterables, see [[ScioContext#unionAll]].
*/
// `T: Coder` context bound is required since `scs` might be empty.
def unionAll[T: Coder](scs: Iterable[SCollection[T]]): SCollection[T] =
scs.head.context.unionAll(scs)

def unionAll[T: Coder](scs: Iterable[SCollection[T]]): SCollection[T] = {
scs.toList match {
case Nil =>
throw new IllegalArgumentException(
"SCollection.unionAll called with empty collection. Use ScioContext#unionAll for empty-safe operations."
)
case head :: Nil => head
case head :: tail =>
val tfName = head.context.tfName
head.context.wrap(
PCollectionList
.of((head :: tail).map(_.internal).asJava)
.apply(tfName, Flatten.pCollections())
)
}
}
/** Implicit conversion from SCollection to DoubleSCollectionFunctions. */
implicit def makeDoubleSCollectionFunctions(s: SCollection[Double]): DoubleSCollectionFunctions =
new DoubleSCollectionFunctions(s)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"unionAll companion object" should "throw descriptive error on empty collection" in {
val empty = Seq.empty[SCollection[Int]]
val exception = the[IllegalArgumentException] thrownBy {
SCollection.unionAll(empty)
}
exception.getMessage should include("ScioContext#unionAll")
}

"unionAll companion object" should "handle single collection" in {
runWithContext { sc =>
val single = sc.parallelize(Seq(1, 2, 3))
val result = SCollection.unionAll(Seq(single))
result should containInAnyOrder(Seq(1, 2, 3))
}
}

"unionAll companion object" should "handle multiple collections" in {
runWithContext { sc =>
val sc1 = sc.parallelize(Seq(1, 2))
val sc2 = sc.parallelize(Seq(3, 4))
val sc3 = sc.parallelize(Seq(5, 6))
val result = SCollection.unionAll(Seq(sc1, sc2, sc3))
result should containInAnyOrder(Seq(1, 2, 3, 4, 5, 6))
}
}
"unionAll context method" should "handle empty collection safely" in {
runWithContext { sc =>
val empty = Seq.empty[SCollection[Int]]
val result = sc.unionAll(empty)
result should beEmpty
}
}

"unionAll context method" should "handle multiple collections" in {
runWithContext { sc =>
val sc1 = sc.parallelize(Seq(1, 2))
val sc2 = sc.parallelize(Seq(3, 4))
val result = sc.unionAll(Seq(sc1, sc2))
result should containInAnyOrder(Seq(1, 2, 3, 4))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"ScioContext.unionAll" should "handle empty collection safely" in {
runWithContext { sc =>
val empty = Seq.empty[SCollection[Int]]
val result = sc.unionAll(empty)
result should beEmpty
}
}

"ScioContext.unionAll" should "handle multiple collections" in {
runWithContext { sc =>
val sc1 = sc.parallelize(Seq(1, 2))
val sc2 = sc.parallelize(Seq(3, 4))
val result = sc.unionAll(Seq(sc1, sc2))
result should containInAnyOrder(Seq(1, 2, 3, 4))
}
}