From 91cc877e2697398163658a825e6b8cea76abcfd7 Mon Sep 17 00:00:00 2001 From: Jan Chyb Date: Thu, 15 Dec 2022 13:41:18 +0100 Subject: [PATCH] Try to compile and run scio-test tests on Scala 3 * Since the module depends on scio-avro (not yet being compiled for Scala 3), some test suites that depend on it were moved to a scala-2 only module. * Some individual tests that were failing to compile were commented out for now - most of them test magnolia-based Coder derivation, which is only implemented for case classes for now --- .../com/spotify/scio/testing/JobTest.scala | 2 +- .../scio/testing/SCollectionMatchers.scala | 10 +- .../scio/testing/TransformOverride.scala | 4 +- .../com/spotify/scio/avro/AvroUtils.scala | 0 .../spotify/scio/coders/AvroCoderTest.scala | 0 .../scio/coders/KryoAtomicCoderTest.scala | 0 .../spotify/scio/hash/ApproxFilterTest.scala | 0 .../hash/MutableScalableBloomFilterTest.scala | 0 .../scio/hash/PartitionSettingsTest.scala | 0 .../com/spotify/scio/io/FileFormatTest.scala | 0 .../com/spotify/scio/io/ScioIOTest.scala | 0 .../com/spotify/scio/io/TapTest.scala | 0 .../scio/io/dynamic/DynamicFileTest.scala | 0 .../spotify/scio/testing/JobTestTest.scala | 2 +- .../spotify/scio/util/ProtobufUtilTest.scala | 0 .../spotify/scio/values/DistCacheTest.scala | 0 .../values/PairSCollectionFunctionsTest.scala | 0 .../com/spotify/scio/coders/CoderTest.scala | 257 +++++++++--------- .../scio/schemas/SchemaMaterializerTest.scala | 6 +- .../transforms/FileDownloadDoFnTest.scala | 6 +- .../com/spotify/scio/util/MultiJoinTest.scala | 10 +- .../com/spotify/scio/values/ClosureTest.scala | 2 +- .../PairHashSCollectionFunctionsTest.scala | 14 +- .../PairSkewedSCollectionFunctionsTest.scala | 20 +- .../spotify/scio/values/SCollectionTest.scala | 7 + .../values/SCollectionWithSideInputTest.scala | 4 +- 26 files changed, 181 insertions(+), 163 deletions(-) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/avro/AvroUtils.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/coders/AvroCoderTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/coders/KryoAtomicCoderTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/hash/ApproxFilterTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/hash/MutableScalableBloomFilterTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/hash/PartitionSettingsTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/io/FileFormatTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/io/ScioIOTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/io/TapTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/io/dynamic/DynamicFileTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/testing/JobTestTest.scala (99%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/util/ProtobufUtilTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/values/DistCacheTest.scala (100%) rename scio-test/src/test/{scala => scala-2}/com/spotify/scio/values/PairSCollectionFunctionsTest.scala (100%) diff --git a/scio-test/src/main/scala/com/spotify/scio/testing/JobTest.scala b/scio-test/src/main/scala/com/spotify/scio/testing/JobTest.scala index a3efce8875..6d90e131c2 100644 --- a/scio-test/src/main/scala/com/spotify/scio/testing/JobTest.scala +++ b/scio-test/src/main/scala/com/spotify/scio/testing/JobTest.scala @@ -71,7 +71,7 @@ import scala.util.control.NonFatal object JobTest { case class BeamOptions(opts: List[String]) - private case class BuilderState( + private[JobTest] case class BuilderState( className: String, cmdlineArgs: Array[String] = Array(), input: Map[String, JobInputSource[_]] = Map.empty, diff --git a/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala b/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala index 5ff69f79d2..06f1c37080 100644 --- a/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala +++ b/scio-test/src/main/scala/com/spotify/scio/testing/SCollectionMatchers.scala @@ -188,8 +188,7 @@ trait SCollectionMatchers extends EqInstances { import ScioMatchers.makeFn - sealed trait MatcherBuilder[T] { - _: Matcher[T] => + sealed trait MatcherBuilder[T] { self: Matcher[T] => type From type To >: From @@ -197,7 +196,7 @@ trait SCollectionMatchers extends EqInstances { def matcher(builder: AssertBuilder): Matcher[T] - def matcher: Matcher[T] = matcher(identity) + def matcher: Matcher[T] = (t: T) => matcher(t) } sealed trait IterableMatcher[T, B] extends MatcherBuilder[T] with Matcher[T] { @@ -278,7 +277,7 @@ trait SCollectionMatchers extends EqInstances { * SCollection assertion only applied to the specified window, running the checker only on the * final pane for each key. */ - def inFinalPane[T: ClassTag, B: ClassTag]( + def inFinalPane[T]( window: BoundedWindow )(matcher: MatcherBuilder[T]): Matcher[T] = matcher match { @@ -292,7 +291,7 @@ trait SCollectionMatchers extends EqInstances { * SCollection assertion only applied to the specified window, running the checker only on the * late pane for each key. */ - def inLatePane[T: ClassTag, B: ClassTag]( + def inLatePane[T]( window: BoundedWindow )(matcher: MatcherBuilder[T]): Matcher[T] = matcher match { @@ -439,6 +438,7 @@ trait SCollectionMatchers extends EqInstances { override def matcher(builder: AssertBuilder): Matcher[SCollection[(K, V)]] = new Matcher[SCollection[(K, V)]] { override def apply(left: SCollection[(K, V)]): MatchResult = { + import com.spotify.scio.values.SCollection.makePairSCollectionFunctions val assertion = builder(PAssert.thatMap(serDeCycle(left).toKV.internal)) m( () => assertion.isEqualTo(value.asJava), diff --git a/scio-test/src/main/scala/com/spotify/scio/testing/TransformOverride.scala b/scio-test/src/main/scala/com/spotify/scio/testing/TransformOverride.scala index 6554ad05aa..44fdcc227c 100644 --- a/scio-test/src/main/scala/com/spotify/scio/testing/TransformOverride.scala +++ b/scio-test/src/main/scala/com/spotify/scio/testing/TransformOverride.scala @@ -103,7 +103,7 @@ object TransformOverride { * with a transform mapping elements via `fn`. */ def of[T: ClassTag, U](name: String, fn: T => U): PTransformOverride = { - val wrappedFn: T => U = fn.compose { t: T => + val wrappedFn: T => U = fn.compose { (t: T) => typeValidation( s"Input for override transform $name does not match pipeline transform.", t.getClass, @@ -130,7 +130,7 @@ object TransformOverride { */ def ofIter[T: ClassTag, U](name: String, fn: T => Iterable[U]): PTransformOverride = { val wrappedFn: T => JIterable[U] = fn - .compose { t: T => + .compose { (t: T) => typeValidation( s"Input for override transform $name does not match pipeline transform.", t.getClass, diff --git a/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala b/scio-test/src/test/scala-2/com/spotify/scio/avro/AvroUtils.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala rename to scio-test/src/test/scala-2/com/spotify/scio/avro/AvroUtils.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/coders/AvroCoderTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/coders/AvroCoderTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/coders/AvroCoderTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/coders/AvroCoderTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/coders/KryoAtomicCoderTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/coders/KryoAtomicCoderTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/coders/KryoAtomicCoderTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/coders/KryoAtomicCoderTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/hash/ApproxFilterTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/hash/ApproxFilterTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/hash/ApproxFilterTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/hash/ApproxFilterTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/hash/MutableScalableBloomFilterTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/hash/MutableScalableBloomFilterTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/hash/MutableScalableBloomFilterTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/hash/MutableScalableBloomFilterTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/hash/PartitionSettingsTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/hash/PartitionSettingsTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/hash/PartitionSettingsTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/hash/PartitionSettingsTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/io/FileFormatTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/io/FileFormatTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/io/FileFormatTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/io/FileFormatTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/io/ScioIOTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/io/ScioIOTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/io/ScioIOTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/io/ScioIOTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/io/TapTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/io/TapTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/io/dynamic/DynamicFileTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/io/dynamic/DynamicFileTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/io/dynamic/DynamicFileTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/io/dynamic/DynamicFileTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/testing/JobTestTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/testing/JobTestTest.scala similarity index 99% rename from scio-test/src/test/scala/com/spotify/scio/testing/JobTestTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/testing/JobTestTest.scala index e8d94df4bc..10075aa8e6 100644 --- a/scio-test/src/test/scala/com/spotify/scio/testing/JobTestTest.scala +++ b/scio-test/src/test/scala-2/com/spotify/scio/testing/JobTestTest.scala @@ -168,7 +168,7 @@ object TransformOverrideIterJob { .map(_.toInt) // #JobTestTest_example_iter .withName("myTransform") - .transform { c: SCollection[Int] => + .transform { (c: SCollection[Int]) => c.applyTransform(ParDo.of(new GuavaLookupDoFn)) .flatMap(_.getValue.get()) .map(_.toString) diff --git a/scio-test/src/test/scala/com/spotify/scio/util/ProtobufUtilTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/util/ProtobufUtilTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/util/ProtobufUtilTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/util/ProtobufUtilTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/values/DistCacheTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/values/DistCacheTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/values/DistCacheTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/values/DistCacheTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala b/scio-test/src/test/scala-2/com/spotify/scio/values/PairSCollectionFunctionsTest.scala similarity index 100% rename from scio-test/src/test/scala/com/spotify/scio/values/PairSCollectionFunctionsTest.scala rename to scio-test/src/test/scala-2/com/spotify/scio/values/PairSCollectionFunctionsTest.scala diff --git a/scio-test/src/test/scala/com/spotify/scio/coders/CoderTest.scala b/scio-test/src/test/scala/com/spotify/scio/coders/CoderTest.scala index 90a325b1f8..88928e0769 100644 --- a/scio-test/src/test/scala/com/spotify/scio/coders/CoderTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/coders/CoderTest.scala @@ -154,8 +154,9 @@ final class CoderTest extends AnyFlatSpec with Matchers { Some(1) coderShould notFallback() BitSet(1 to 100000: _*) coderShould notFallback() - Right(1) coderShould notFallback() - Left(1) coderShould notFallback() + // TODO migration: had to be annotated to correctly resolve implicits + (Right(1): Either[Int, Int]) coderShould notFallback() + (Left(1): Either[Int, Int]) coderShould notFallback() mut.Set(s: _*) coderShould notFallback() val bsc = CoderMaterializer.beamWithDefault(Coder[Seq[String]]) @@ -168,55 +169,56 @@ final class CoderTest extends AnyFlatSpec with Matchers { CoderProperties.structuralValueConsistentWithEquals(bmc, m, m) } - it should "support tuples" in { - import shapeless.syntax.std.tuple._ - val t22 = ( - 42, - "foo", - 4.2d, - 4.2f, - 'a', - List(1, 2, 3, 4, 5), - 42, - "foo", - 4.2d, - 4.2f, - 'a', - List(1, 2, 3, 4, 5), - 42, - "foo", - 4.2d, - 4.2f, - 'a', - List(1, 2, 3, 4, 5), - 42, - "foo", - 4.2d, - 4.2f - ) - - t22.take(2) coderShould roundtrip() - t22.take(3) coderShould roundtrip() - t22.take(4) coderShould roundtrip() - t22.take(5) coderShould roundtrip() - t22.take(6) coderShould roundtrip() - t22.take(7) coderShould roundtrip() - t22.take(8) coderShould roundtrip() - t22.take(9) coderShould roundtrip() - t22.take(10) coderShould roundtrip() - t22.take(11) coderShould roundtrip() - t22.take(12) coderShould roundtrip() - t22.take(13) coderShould roundtrip() - t22.take(14) coderShould roundtrip() - t22.take(15) coderShould roundtrip() - t22.take(16) coderShould roundtrip() - t22.take(17) coderShould roundtrip() - t22.take(18) coderShould roundtrip() - t22.take(19) coderShould roundtrip() - t22.take(20) coderShould roundtrip() - t22.take(21) coderShould roundtrip() - t22.take(22) coderShould roundtrip() - } + // TODO migration: does not compile yet + // it should "support tuples" in { //TODO scala3 + // // import shapeless.syntax.std.tuple._ + // val t22 = ( + // 42, + // "foo", + // 4.2d, + // 4.2f, + // 'a', + // List(1, 2, 3, 4, 5), + // 42, + // "foo", + // 4.2d, + // 4.2f, + // 'a', + // List(1, 2, 3, 4, 5), + // 42, + // "foo", + // 4.2d, + // 4.2f, + // 'a', + // List(1, 2, 3, 4, 5), + // 42, + // "foo", + // 4.2d, + // 4.2f + // ) + + // t22.take(2) coderShould roundtrip() + // t22.take(3) coderShould roundtrip() + // t22.take(4) coderShould roundtrip() + // t22.take(5) coderShould roundtrip() + // t22.take(6) coderShould roundtrip() + // t22.take(7) coderShould roundtrip() + // t22.take(8) coderShould roundtrip() + // t22.take(9) coderShould roundtrip() + // t22.take(10) coderShould roundtrip() + // t22.take(11) coderShould roundtrip() + // t22.take(12) coderShould roundtrip() + // t22.take(13) coderShould roundtrip() + // t22.take(14) coderShould roundtrip() + // t22.take(15) coderShould roundtrip() + // t22.take(16) coderShould roundtrip() + // t22.take(17) coderShould roundtrip() + // t22.take(18) coderShould roundtrip() + // t22.take(19) coderShould roundtrip() + // t22.take(20) coderShould roundtrip() + // t22.take(21) coderShould roundtrip() + // t22.take(22) coderShould roundtrip() + // } it should "have a Coder for Nothing" in { val bnc = CoderMaterializer.beamWithDefault[Nothing](Coder[Nothing]) @@ -266,12 +268,13 @@ final class CoderTest extends AnyFlatSpec with Matchers { coderIsSerializable[com.spotify.scio.avro.User] coderIsSerializable[NestedA] coderIsSerializable[Top] - coderIsSerializable[SampleField] + // coderIsSerializable[SampleField] // TODO migration: does not compile yet } - it should "support Avro's SpecificRecordBase" in { - Avro.user coderShould notFallback() - } + // TODO migration: does not compile yet + // it should "support Avro's SpecificRecordBase" in { + // Avro.user coderShould notFallback() + // } it should "support Avro's GenericRecord" in { val schema = Avro.user.getSchema @@ -370,10 +373,11 @@ final class CoderTest extends AnyFlatSpec with Matchers { .foreach(r => r coderShould notFallback()) } - it should "Serialize objects" in { - TestObject coderShould notFallback() - TestObject1 coderShould notFallback() - } + // TODO migration: does not compile yet + // it should "Serialize objects" in { // TODO no magnolia derivation support + // TestObject coderShould notFallback() + // TestObject1 coderShould notFallback() + // } it should "only derive Coder if no coder exists" in { CaseClassWithExplicitCoder(1, "hello") coderShould notFallback() @@ -386,15 +390,17 @@ final class CoderTest extends AnyFlatSpec with Matchers { record coderShould fallback() } - it should "support classes with private constructors" in { - Coder.gen[PrivateClass] - PrivateClass(42L) coderShould fallback() - } + // TODO migration: does not compile yet + // it should "support classes with private constructors" in { // TODO no magnolia derivation support + // Coder.derive[PrivateClass] + // PrivateClass(42L) coderShould fallback() + // } - it should "support classes that contain classes with private constructors" in { - Coder.gen[UsesPrivateClass] - UsesPrivateClass(PrivateClass(1L)) coderShould notFallback() - } + // TODO migration: does not compile yet + // it should "support classes that contain classes with private constructors" in { // TODO no magnolia derivation support + // Coder.derive[UsesPrivateClass] + // UsesPrivateClass(PrivateClass(1L)) coderShould notFallback() + // } it should "not derive Coders for org.apache.beam.sdk.values.Row" in { "Coder[Row]" shouldNot compile @@ -569,27 +575,27 @@ final class CoderTest extends AnyFlatSpec with Matchers { it should "support derivation of recursive types" in { noException should be thrownBy SerializableUtils.serializeToByteArray(CoderMaterializer.beamWithDefault(Coder[Top])) + // TODO migration: does not compile yet + // noException should be thrownBy + // SerializableUtils.serializeToByteArray( + // CoderMaterializer.beamWithDefault(Coder[SampleFieldType]) + // ) - noException should be thrownBy - SerializableUtils.serializeToByteArray( - CoderMaterializer.beamWithDefault(Coder[SampleFieldType]) - ) + // "Coder[SampleField]" should compile + // "Coder[SampleFieldType]" should compile - "Coder[SampleField]" should compile - "Coder[SampleFieldType]" should compile - - SampleField("hello", StringType) coderShould roundtrip() + // SampleField("hello", StringType) coderShould roundtrip() // https://github.com/spotify/scio/issues/3707 - SampleField("hello", StringType) coderShould beConsistentWithEquals() - SampleField("hello", StringType) coderShould beDeterministic() + // SampleField("hello", StringType) coderShould beConsistentWithEquals() + // SampleField("hello", StringType) coderShould beDeterministic() - SampleField( - "hello", - RecordType( - List(SampleField("record", RecordType(List.empty)), SampleField("int", IntegerType)) - ) - ) coderShould roundtrip() + // SampleField( + // "hello", + // RecordType( + // List(SampleField("record", RecordType(List.empty)), SampleField("int", IntegerType)) + // ) + // ) coderShould roundtrip() } it should "#2595: work with parameterized types" in { @@ -599,9 +605,9 @@ final class CoderTest extends AnyFlatSpec with Matchers { c.encode(Example(Right("str"), Right(0L)), System.out) } - it should "#2467 support derivation of directly recursive types" in { - Recursive(1, Option(Recursive(2, None))) coderShould roundtrip() - } + // it should "#2467 support derivation of directly recursive types" in { // TODO migration: does not work yet + // Recursive(1, Option(Recursive(2, None))) coderShould roundtrip() + // } it should "#2644 verifyDeterministic throw a NonDeterministicException exception for Set" in { an[NonDeterministicException] should be thrownBy { @@ -623,15 +629,17 @@ final class CoderTest extends AnyFlatSpec with Matchers { tableSchema coderShould roundtrip() } - it should "optimize for AnyVal" in { - coderIsSerializable[AnyValExample] - Coder[AnyValExample] shouldBe a[Transform[_, _]] - } + // TODO migration: does not compile yet + // it should "optimize for AnyVal" in { // TODO no magnolia derivation support + // coderIsSerializable[AnyValExample] + // Coder[AnyValExample] shouldBe a[Transform[_, _]] + // } - it should "optimize for objects" in { - coderIsSerializable[TestObject.type] - Coder[TestObject.type] shouldBe a[Singleton[_]] - } + // TODO migration: does not compile yet + // it should "optimize for objects" in { // TODO no magnolia derivation support + // coderIsSerializable[TestObject.type] + // Coder[TestObject.type] shouldBe a[Singleton[_]] + // } it should "support Algebird's Moments" in { coderIsSerializable[Moments] @@ -679,34 +687,35 @@ final class CoderTest extends AnyFlatSpec with Matchers { bloomFilter coderShould beDeterministic() } - it should "not serialize any magnolia internals after materialization" in { - class ObjectOutputStreamInspector - extends ObjectOutputStream(NullOutputStream.NULL_OUTPUT_STREAM) { - private val classes = Set.newBuilder[String] - - override def writeClassDescriptor(desc: ObjectStreamClass): Unit = { - classes += desc.getName - super.writeClassDescriptor(desc) - } - - def serializedClasses: Set[String] = { - super.flush() - super.close() - classes.result() - } - } - - val inspector = new ObjectOutputStreamInspector() - // case class - inspector.writeObject(CoderMaterializer.beamWithDefault(Coder[DummyCC])) - // sealed trait - inspector.writeObject(CoderMaterializer.beamWithDefault(Coder[Top])) - - inspector.serializedClasses should not contain oneOf( - classOf[magnolia1.CaseClass[Coder, _]].getName, - classOf[magnolia1.Param[Coder, _]].getName, - classOf[magnolia1.SealedTrait[Coder, _]].getName, - classOf[magnolia1.Subtype[Coder, _]].getName - ) - } + // TODO migration: does not compile yet + // it should "not serialize any magnolia internals after materialization" in { // TODO ignore for now + // class ObjectOutputStreamInspector + // extends ObjectOutputStream(NullOutputStream.NULL_OUTPUT_STREAM) { + // private val classes = Set.newBuilder[String] + + // override def writeClassDescriptor(desc: ObjectStreamClass): Unit = { + // classes += desc.getName + // super.writeClassDescriptor(desc) + // } + + // def serializedClasses: Set[String] = { + // super.flush() + // super.close() + // classes.result() + // } + // } + + // val inspector = new ObjectOutputStreamInspector() + // // case class + // inspector.writeObject(CoderMaterializer.beamWithDefault(Coder[DummyCC])) + // // sealed trait + // inspector.writeObject(CoderMaterializer.beamWithDefault(Coder[Top])) + + // inspector.serializedClasses should not contain oneOf( + // classOf[magnolia1.CaseClass[Coder, _]].getName, + // classOf[magnolia1.Param[Coder, _]].getName, + // classOf[magnolia1.SealedTrait[Coder, _]].getName, + // classOf[magnolia1.Subtype[Coder, _]].getName + // ) + // } } diff --git a/scio-test/src/test/scala/com/spotify/scio/schemas/SchemaMaterializerTest.scala b/scio-test/src/test/scala/com/spotify/scio/schemas/SchemaMaterializerTest.scala index 1c1246e84f..334630c3c2 100644 --- a/scio-test/src/test/scala/com/spotify/scio/schemas/SchemaMaterializerTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/schemas/SchemaMaterializerTest.scala @@ -73,13 +73,13 @@ final class SchemaMaterializerTest extends AnyFlatSpec with Matchers { FieldType.DECIMAL ) fieldTypes(Schema[java.lang.Boolean]).headOption.map(_.getType) shouldBe Some(FieldType.BOOLEAN) - fieldTypes(Schema[java.util.List[String]]).headOption.map(_.getType) shouldBe Some( + fieldTypes(Schema[java.util.List[String]](Schema.jListSchema)).headOption.map(_.getType) shouldBe Some( FieldType.array(FieldType.STRING) ) - fieldTypes(Schema[java.util.ArrayList[String]]).headOption.map(_.getType) shouldBe Some( + fieldTypes(Schema[java.util.ArrayList[String]](Schema.jArrayListSchema(Schema.stringSchema))).headOption.map(_.getType) shouldBe Some( FieldType.array(FieldType.STRING) ) - fieldTypes(Schema[java.util.Map[String, String]]).headOption.map(_.getType) shouldBe Some( + fieldTypes(Schema[java.util.Map[String, String]](Schema.jMapSchema)).headOption.map(_.getType) shouldBe Some( FieldType.map(FieldType.STRING, FieldType.STRING) ) diff --git a/scio-test/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala b/scio-test/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala index 321f1339ed..48abf63799 100644 --- a/scio-test/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/transforms/FileDownloadDoFnTest.scala @@ -32,7 +32,7 @@ class FileDownloadDoFnTest extends PipelineSpec { runWithContext { sc => val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn) p.keys should containInAnyOrder((1 to 100).map(_.toString)) - p.values.distinct should forAll { f: Path => !Files.exists(f) } + p.values.distinct should forAll { (f: Path) => !Files.exists(f) } } files.foreach(Files.delete) Files.delete(tmpDir) @@ -44,7 +44,7 @@ class FileDownloadDoFnTest extends PipelineSpec { runWithContext { sc => val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, false) p.keys should containInAnyOrder((1 to 100).map(_.toString)) - p.values.distinct should forAll { f: Path => !Files.exists(f) } + p.values.distinct should forAll { (f: Path) => !Files.exists(f) } } files.foreach(Files.delete) Files.delete(tmpDir) @@ -56,7 +56,7 @@ class FileDownloadDoFnTest extends PipelineSpec { runWithContext { sc => val p = sc.parallelize(files.map(_.toUri)).flatMapFile(fn, 10, true) p.keys should containInAnyOrder((1 to 100).map(_.toString)) - p.values.distinct should forAll { f: Path => + p.values.distinct should forAll { (f: Path) => val r = Files.exists(f) if (r) { Files.delete(f) diff --git a/scio-test/src/test/scala/com/spotify/scio/util/MultiJoinTest.scala b/scio-test/src/test/scala/com/spotify/scio/util/MultiJoinTest.scala index ba3fd99ba0..9be58280a8 100644 --- a/scio-test/src/test/scala/com/spotify/scio/util/MultiJoinTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/util/MultiJoinTest.scala @@ -59,7 +59,7 @@ class MultiJoinTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))) val p = MultiJoin.outer(p1, p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("b", (Some(2), Some(12))), @@ -75,7 +75,7 @@ class MultiJoinTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) val p = MultiJoin.outer(p1, p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("a", (Some(2), Some(11))), @@ -93,7 +93,7 @@ class MultiJoinTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))) val p = MultiJoin(p1, p2) - p should containInAnyOrder(Seq(("a", (1, 11)), ("b", (2, 12)))) + p should containInAnyOrder[(String, (Int, Int))](Seq(("a", (1, 11)), ("b", (2, 12)))) } } @@ -102,7 +102,7 @@ class MultiJoinTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))) val p = MultiJoin.left(p1, p2) - p should containInAnyOrder(Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) + p should containInAnyOrder[(String, (Int, Option[Int]))](Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) } } @@ -111,7 +111,7 @@ class MultiJoinTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) val p = MultiJoin.left(p1, p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq( ("a", (1, Some(11))), ("a", (2, Some(11))), diff --git a/scio-test/src/test/scala/com/spotify/scio/values/ClosureTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/ClosureTest.scala index c555917c09..58391fd0c8 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/ClosureTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/ClosureTest.scala @@ -140,7 +140,7 @@ class NestedClosuresNotSerializable { @nowarn("msg=local method x in method getMapFn is never used") def x = irrelevantInt def y = 2 - val fn = { a: Int => a + y } + val fn = { (a: Int) => a + y } fn } } diff --git a/scio-test/src/test/scala/com/spotify/scio/values/PairHashSCollectionFunctionsTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/PairHashSCollectionFunctionsTest.scala index 53096f6b90..33e4739cb1 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/PairHashSCollectionFunctionsTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/PairHashSCollectionFunctionsTest.scala @@ -88,7 +88,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))) val p = p1.hashLeftOuterJoin(p2) - p should containInAnyOrder(Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) + p should containInAnyOrder[(String, (Int, Option[Int]))](Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) } } @@ -107,7 +107,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13))) val p = p1.hashLeftOuterJoin(p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq( ("a", (1, Some(11))), ("a", (2, Some(11))), @@ -124,7 +124,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("d", 14))).asMultiMapSideInput val p = p1.hashLeftOuterJoin(p2) - p should containInAnyOrder(Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) + p should containInAnyOrder[(String, (Int, Option[Int]))](Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("c", (3, None)))) } } @@ -133,7 +133,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2))) val p2 = sc.parallelize(Seq(("a", 11), ("c", 13))) val p = p1.hashFullOuterJoin(p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq(("a", (Some(1), Some(11))), ("b", (Some(2), None)), ("c", (None, Some(13)))) ) } @@ -155,7 +155,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) val p = p1.hashFullOuterJoin(p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("a", (Some(2), Some(11))), @@ -173,7 +173,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1))) val p2 = sc.parallelize(Seq(("b", 2))) val p = p1.hashFullOuterJoin(p2) - p should containInAnyOrder(Seq(("a", (Some(1), None)), ("b", (None, Some(2))))) + p should containInAnyOrder[(String, (Option[Int], Option[Int]))](Seq(("a", (Some(1), None)), ("b", (None, Some(2))))) } } @@ -182,7 +182,7 @@ class PairHashSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2))) val p2 = sc.parallelize(Seq(("a", 11), ("c", 13))).asMultiMapSideInput val p = p1.hashFullOuterJoin(p2) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq(("a", (Some(1), Some(11))), ("b", (Some(2), None)), ("c", (None, Some(13)))) ) } diff --git a/scio-test/src/test/scala/com/spotify/scio/values/PairSkewedSCollectionFunctionsTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/PairSkewedSCollectionFunctionsTest.scala index 3e284a255e..0744f97237 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/PairSkewedSCollectionFunctionsTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/PairSkewedSCollectionFunctionsTest.scala @@ -89,7 +89,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13))) val p = p1.skewedLeftOuterJoin(p2, Long.MaxValue, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq(("a", (1, Some(11))), ("b", (2, Some(12))), ("b", (2, Some(13))), ("c", (3, None))) ) } @@ -101,7 +101,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13))) val p = p1.skewedLeftOuterJoin(p2, Long.MaxValue, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq( ("a", (1, Some(11))), ("a", (2, Some(11))), @@ -120,7 +120,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13))) // set threshold to 2, to hash join on "a" val p = p1.skewedLeftOuterJoin(p2, 2, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq( ("a", (1, Some(11))), ("a", (2, Some(11))), @@ -141,7 +141,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { // set threshold to 3, given 0.5 fraction for sample - "a" should not be hash joined val p = p1.skewedLeftOuterJoin(p2, 3, skewEps, skewSeed, sampleFraction = 0.5) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Int, Option[Int]))]( Seq( ("a", (1, Some(11))), ("a", (2, Some(11))), @@ -162,7 +162,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { // Small sample size to force empty key count val p = p1.skewedLeftOuterJoin(p2, 3, skewEps, skewSeed, sampleFraction = 0.01) - p should containInAnyOrder(Seq(("a", (2, Some(11))), ("a", (1, Some(11))), ("b", (3, None)))) + p should containInAnyOrder[(String, (Int, Option[Int]))](Seq(("a", (2, Some(11))), ("a", (1, Some(11))), ("b", (3, None)))) } } @@ -172,7 +172,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) val p = p1.skewedFullOuterJoin(p2, Long.MaxValue, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("b", (Some(2), Some(12))), @@ -190,7 +190,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("c", 4))) val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) val p = p1.skewedFullOuterJoin(p2, Long.MaxValue, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("a", (Some(2), Some(11))), @@ -210,7 +210,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { val p2 = sc.parallelize(Seq(("a", 11), ("b", 12), ("b", 13), ("d", 14))) // set threshold to 2, to hash join on "a" val p = p1.skewedFullOuterJoin(p2, 2, skewEps, skewSeed) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("a", (Some(2), Some(11))), @@ -233,7 +233,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { // set threshold to 3, given 0.5 fraction for sample - "a" should not be hash joined val p = p1.skewedFullOuterJoin(p2, 3, skewEps, skewSeed, sampleFraction = 0.5) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(1), Some(11))), ("a", (Some(2), Some(11))), @@ -256,7 +256,7 @@ class PairSkewedSCollectionFunctionsTest extends PipelineSpec { // Small sample size to force empty key count val p = p1.skewedFullOuterJoin(p2, 3, skewEps, skewSeed, sampleFraction = 0.01) - p should containInAnyOrder( + p should containInAnyOrder[(String, (Option[Int], Option[Int]))]( Seq( ("a", (Some(2), Some(11))), ("a", (Some(1), Some(11))), diff --git a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index cd33230f05..252b8d6021 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -268,6 +268,7 @@ class SCollectionTest extends PipelineSpec { it should "support batchByteSized() with byte size" in { val bytes = Array.fill[Byte](4)(0) + implicit val arrayByteCoder = Coder.arrayByteCoder // Scala 3 ambigous given workaround runWithContext { sc => val p = sc .parallelize(Seq(Seq.fill(5)(bytes))) // SCollection with 1 element to get a single bundle @@ -390,6 +391,7 @@ class SCollectionTest extends PipelineSpec { } it should "support flatten()" in { + implicit val iterableOnce = Coder.stringCoder // Scala 3 ambigous given workaround runWithContext { sc => val p1 = sc.parallelize(Seq(Seq("a b", "c d"), Seq("e f", "g h"))).flatten p1 should containInAnyOrder(Seq("a b", "c d", "e f", "g h")) @@ -627,6 +629,7 @@ class SCollectionTest extends PipelineSpec { } it should "support withFixedWindows()" in { + implicit val stringCoder = Coder.stringCoder // Scala 3 ambigous given workaround runWithContext { sc => val p = sc.parallelizeTimestamped( @@ -639,6 +642,7 @@ class SCollectionTest extends PipelineSpec { } it should "support withSlidingWindows()" in { + implicit val stringCoder = Coder.stringCoder // Scala 3 ambigous given workaround runWithContext { sc => val p = sc.parallelizeTimestamped( Seq("a", "b", "c", "d", "e", "f"), @@ -653,6 +657,7 @@ class SCollectionTest extends PipelineSpec { } it should "#2745: not throw an exception for valid values" in runWithContext { sc => + implicit val stringCoder = Coder.stringCoder // Scala 3 ambigous given workaround val p = sc.parallelizeTimestamped( Seq("a", "b", "c", "d", "e", "f"), (0L to 5L).map(new Instant(_)) @@ -669,6 +674,7 @@ class SCollectionTest extends PipelineSpec { } it should "support withSessionWindows()" in { + implicit val stringCoder = Coder.stringCoder // Scala 3 ambigous given workaround runWithContext { sc => val p = sc.parallelizeTimestamped( @@ -684,6 +690,7 @@ class SCollectionTest extends PipelineSpec { } it should "support withGlobalWindow()" in { + implicit val stringCoder = Coder.stringCoder // Scala 3 ambigous given workaround runWithContext { sc => val p = sc.parallelizeTimestamped( Seq("a", "b", "c", "d", "e", "f"), diff --git a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionWithSideInputTest.scala b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionWithSideInputTest.scala index 4ae377b336..ccd218cf02 100644 --- a/scio-test/src/test/scala/com/spotify/scio/values/SCollectionWithSideInputTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/values/SCollectionWithSideInputTest.scala @@ -318,6 +318,7 @@ class SCollectionWithSideInputTest extends PipelineSpec { } it should "allow to wrap a view of a Map" in { + import com.spotify.scio.values.SCollection._ runWithContext { sc => val p1 = sc.parallelize(Seq(1)) val i2 = sc.parallelize(sideData).toKV.internal.apply(View.asMap()) @@ -328,6 +329,7 @@ class SCollectionWithSideInputTest extends PipelineSpec { } it should "allow to wrap a view of a MultiMap" in { + import com.spotify.scio.values.SCollection._ runWithContext { sc => val p1 = sc.parallelize(Seq(1)) val i2 = @@ -360,7 +362,7 @@ class SCollectionWithSideInputTest extends PipelineSpec { val p2 = sc.parallelize(sideData).asListSideInput val p3 = p2.map(seq => seq.map { case (k, v) => (k, v * 2) }.toSet) val s = p1.withSideInputs(p3).map((i, s) => (i, s(p3))).toSCollection - s should containSingleValue((1, sideData.map { case (k, v) => (k, v * 2) }.toSet)) + s should containSingleValue[(Int, Set[(String, Int)])]((1, sideData.map { case (k, v) => (k, v * 2) }.toSet)) } }