From a8368cffc791add1bf3a18dbe9efd837ed4971df Mon Sep 17 00:00:00 2001 From: Alex Kolb Date: Fri, 29 Nov 2024 09:30:12 +0100 Subject: [PATCH] fix tests --- .../scio/bigtable/BigTableBatchDoFnTest.scala | 6 +-- .../spotify/scio/grpc/GrpcBatchDoFnTest.scala | 42 +------------------ 2 files changed, 5 insertions(+), 43 deletions(-) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala index 5f4f9141fd..fd579b42b9 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala @@ -33,14 +33,14 @@ class BigtableBatchDoFnTest extends PipelineSpec { "BigtableDoFn" should "work" in { val fn = new TestBigtableBatchDoFn val output = runWithData(1 to 10)(_.parDo(fn)) - .map(kv => (kv.getKey, kv.getValue.get().iterator().next())) + .map(kv => (kv.getKey, kv.getValue.get())) output should contain theSameElementsAs (1 to 10).map(x => (x, x.toString)) } it should "work with cache" in { val fn = new TestCachingBigtableBatchDoFn val output = runWithData((1 to 10) ++ (6 to 15))(_.parDo(fn)) - .map(kv => (kv.getKey, kv.getValue.get().iterator().next())) + .map(kv => (kv.getKey, kv.getValue.get())) output should have size 20 output should contain theSameElementsAs ((1 to 10) ++ (6 to 15)).map(x => (x, x.toString)) BigtableBatchDoFnTest.queue.asScala.toSet should contain theSameElementsAs (1 to 15) @@ -52,7 +52,7 @@ class BigtableBatchDoFnTest extends PipelineSpec { val output = runWithData(Seq[Seq[Int]](1 to 4, 8 to 10))(_.flatten.parDo(fn)).map { kv => val r = kv.getValue.asScala match { - case Success(v) => v.asScala.head + case Success(v) => v case Failure(e: CompletionException) => e.getCause.getMessage case Failure(e) => e.getMessage } diff --git a/scio-grpc/src/test/scala/com/spotify/scio/grpc/GrpcBatchDoFnTest.scala b/scio-grpc/src/test/scala/com/spotify/scio/grpc/GrpcBatchDoFnTest.scala index f728c43a15..be6abfe1e7 100644 --- a/scio-grpc/src/test/scala/com/spotify/scio/grpc/GrpcBatchDoFnTest.scala +++ b/scio-grpc/src/test/scala/com/spotify/scio/grpc/GrpcBatchDoFnTest.scala @@ -29,7 +29,6 @@ import com.spotify.scio.transforms.UnmatchedRequestException import io.grpc.netty.NettyChannelBuilder import io.grpc.stub.StreamObserver import io.grpc.{Server, ServerBuilder} -import org.apache.beam.sdk.Pipeline.PipelineExecutionException import org.scalatest.BeforeAndAfterAll import java.net.ServerSocket @@ -167,9 +166,9 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll { .build() } - val expected: Seq[(ConcatRequestWithID, Try[Option[ConcatResponseWithID]])] = input.map { req => + val expected: Seq[(ConcatRequestWithID, Try[ConcatResponseWithID])] = input.map { req => val resp = concat(req) - req -> Success(Some(resp)) + req -> Success(resp) } runWithContext { sc => @@ -246,43 +245,6 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll { } } - it should "throw an IllegalStateException if gRPC response contains unknown ids" in { - val input = (0 to 1).map { i => - ConcatRequestWithID - .newBuilder() - .setRequestId(i.toString) - .setStringOne(i.toString) - .setStringTwo(i.toString) - .build() - } - - assertThrows[IllegalStateException] { - try { - runWithContext { sc => - sc.parallelize(input) - .grpcBatchLookup[ - BatchRequest, - BatchResponse, - ConcatResponseWithID, - ConcatServiceFutureStub - ]( - () => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(), - ConcatServiceGrpc.newFutureStub, - 2, - concatBatchRequest, - r => r.getResponseList.asScala.toSeq.map(e => ("WrongID-" + e.getRequestId, e)), - idExtractor, - 2 - )(_.batchConcat) - } - } catch { - case e: PipelineExecutionException => - e.getMessage should include("Missing result for request with id:") - throw e.getCause - } - } - } - it should "propagate results if elements have the same id" in { val input = for { _ <- 0 to 5