Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lofifnc committed Nov 29, 2024
1 parent ed4c9b4 commit a8368cf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class BigtableBatchDoFnTest extends PipelineSpec {
"BigtableDoFn" should "work" in {
val fn = new TestBigtableBatchDoFn
val output = runWithData(1 to 10)(_.parDo(fn))
.map(kv => (kv.getKey, kv.getValue.get().iterator().next()))
.map(kv => (kv.getKey, kv.getValue.get()))
output should contain theSameElementsAs (1 to 10).map(x => (x, x.toString))
}

it should "work with cache" in {
val fn = new TestCachingBigtableBatchDoFn
val output = runWithData((1 to 10) ++ (6 to 15))(_.parDo(fn))
.map(kv => (kv.getKey, kv.getValue.get().iterator().next()))
.map(kv => (kv.getKey, kv.getValue.get()))
output should have size 20
output should contain theSameElementsAs ((1 to 10) ++ (6 to 15)).map(x => (x, x.toString))
BigtableBatchDoFnTest.queue.asScala.toSet should contain theSameElementsAs (1 to 15)
Expand All @@ -52,7 +52,7 @@ class BigtableBatchDoFnTest extends PipelineSpec {

val output = runWithData(Seq[Seq[Int]](1 to 4, 8 to 10))(_.flatten.parDo(fn)).map { kv =>
val r = kv.getValue.asScala match {
case Success(v) => v.asScala.head
case Success(v) => v
case Failure(e: CompletionException) => e.getCause.getMessage
case Failure(e) => e.getMessage
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import com.spotify.scio.transforms.UnmatchedRequestException
import io.grpc.netty.NettyChannelBuilder
import io.grpc.stub.StreamObserver
import io.grpc.{Server, ServerBuilder}
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.scalatest.BeforeAndAfterAll

import java.net.ServerSocket
Expand Down Expand Up @@ -167,9 +166,9 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll {
.build()
}

val expected: Seq[(ConcatRequestWithID, Try[Option[ConcatResponseWithID]])] = input.map { req =>
val expected: Seq[(ConcatRequestWithID, Try[ConcatResponseWithID])] = input.map { req =>
val resp = concat(req)
req -> Success(Some(resp))
req -> Success(resp)
}

runWithContext { sc =>
Expand Down Expand Up @@ -246,43 +245,6 @@ class GrpcBatchDoFnTest extends PipelineSpec with BeforeAndAfterAll {
}
}

it should "throw an IllegalStateException if gRPC response contains unknown ids" in {
val input = (0 to 1).map { i =>
ConcatRequestWithID
.newBuilder()
.setRequestId(i.toString)
.setStringOne(i.toString)
.setStringTwo(i.toString)
.build()
}

assertThrows[IllegalStateException] {
try {
runWithContext { sc =>
sc.parallelize(input)
.grpcBatchLookup[
BatchRequest,
BatchResponse,
ConcatResponseWithID,
ConcatServiceFutureStub
](
() => NettyChannelBuilder.forTarget(ServiceUri).usePlaintext().build(),
ConcatServiceGrpc.newFutureStub,
2,
concatBatchRequest,
r => r.getResponseList.asScala.toSeq.map(e => ("WrongID-" + e.getRequestId, e)),
idExtractor,
2
)(_.batchConcat)
}
} catch {
case e: PipelineExecutionException =>
e.getMessage should include("Missing result for request with id:")
throw e.getCause
}
}
}

it should "propagate results if elements have the same id" in {
val input = for {
_ <- 0 to 5
Expand Down

0 comments on commit a8368cf

Please sign in to comment.