diff --git a/build.sbt b/build.sbt index 8f990bb2e8..2705a679a4 100644 --- a/build.sbt +++ b/build.sbt @@ -37,7 +37,6 @@ val autoServiceVersion = "1.0.1" val autoValueVersion = "1.9" val avroVersion = sys.props.getOrElse("avro.version", "1.11.4") val bigdataossVersion = "2.2.26" -val bigtableClientVersion = "1.28.0" val commonsCodecVersion = "1.17.1" val commonsCompressVersion = "1.26.2" val commonsIoVersion = "2.16.1" @@ -804,6 +803,7 @@ lazy val `scio-core` = project "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion, "com.github.luben" % "zstd-jni" % zstdJniVersion, + "com.google.api" % "api-common" % gcpBom.key.value, "com.google.api" % "gax" % gcpBom.key.value, "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.auto.service" % "auto-service-annotations" % autoServiceVersion, @@ -1033,12 +1033,12 @@ lazy val `scio-google-cloud-platform` = project "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % jacksonVersion, "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion, + "com.google.api" % "api-common" % gcpBom.key.value, "com.google.api" % "gax" % gcpBom.key.value, "com.google.api" % "gax-grpc" % gcpBom.key.value, "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1" % gcpBom.key.value, - "com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % gcpBom.key.value, @@ -1050,8 +1050,6 @@ lazy val `scio-google-cloud-platform` = project "com.google.cloud" % "google-cloud-core" % gcpBom.key.value, "com.google.cloud" % "google-cloud-spanner" % gcpBom.key.value, "com.google.cloud.bigdataoss" % "util" % bigdataossVersion, - "com.google.cloud.bigtable" % "bigtable-client-core" % bigtableClientVersion, - "com.google.cloud.bigtable" % "bigtable-client-core-config" % bigtableClientVersion, "com.google.guava" % "guava" % guavaVersion, "com.google.http-client" % "google-http-client" % gcpBom.key.value, "com.google.http-client" % "google-http-client-gson" % gcpBom.key.value, diff --git a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala index 29e0407d3c..779227770f 100644 --- a/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala @@ -18,11 +18,8 @@ package com.spotify.scio.bigtable import java.util.UUID - -import com.google.bigtable.admin.v2.{DeleteTableRequest, GetTableRequest, ListTablesRequest} import com.google.bigtable.v2.{Mutation, Row, RowFilter} -import com.google.cloud.bigtable.config.BigtableOptions -import com.google.cloud.bigtable.grpc._ +import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient} import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.testing._ @@ -41,12 +38,6 @@ object BigtableIT { def testData(id: String): Seq[(String, Long)] = Seq((s"$id-key1", 1L), (s"$id-key2", 2L), (s"$id-key3", 3L)) - val bigtableOptions: BigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - val FAMILY_NAME: String = "count" val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long") @@ -67,12 +58,6 @@ object BigtableIT { def fromRow(r: Row): (String, Long) = (r.getKey.toStringUtf8, r.getValue(FAMILY_NAME, COLUMN_QUALIFIER).get.toStringUtf8.toLong) - - def listTables(client: BigtableTableAdminGrpcClient): Set[String] = { - val instancePath = s"projects/$projectId/instances/$instanceId" - val tables = client.listTables(ListTablesRequest.newBuilder().setParent(instancePath).build) - tables.getTablesList.asScala.map(t => new BigtableTableName(t.getName).getTableId).toSet - } } class BigtableIT extends PipelineSpec { @@ -80,18 +65,22 @@ class BigtableIT extends PipelineSpec { // "Update number of bigtable nodes" should "work" in { ignore should "update number of bigtable nodes" in { - val bt = new BigtableClusterUtilities(bigtableOptions) - val sc = ScioContext() - sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10)) - sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4 - bt.getClusterNodeCount(clusterId, zoneId) shouldBe 4 - sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10)) - sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3 - bt.getClusterNodeCount(clusterId, zoneId) shouldBe 3 + val client = BigtableInstanceAdminClient.create(projectId) + try { + val sc = ScioContext() + sc.updateNumberOfBigtableNodes(projectId, instanceId, 4, Duration.standardSeconds(10)) + sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 4 + client.getCluster(clusterId, zoneId).getServeNodes shouldBe 4 + sc.updateNumberOfBigtableNodes(projectId, instanceId, 3, Duration.standardSeconds(10)) + sc.getBigtableClusterSizes(projectId, instanceId)(clusterId) shouldBe 3 + client.getCluster(clusterId, zoneId).getServeNodes shouldBe 3 + } finally { + client.close() + } } "BigtableIO" should "work in default mode" in { - TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME))) + Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME)) val id = testId() val data = testData(id) try { @@ -126,12 +115,7 @@ class BigtableIT extends PipelineSpec { } it should "work in bulk mode" in { - TableAdmin.ensureTables(bigtableOptions, Map(tableId -> List(FAMILY_NAME))) - val options = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build() + Admin.Table.ensureTable(projectId, instanceId, tableId, List(FAMILY_NAME)) val id = testId() val data = testData(id) @@ -140,7 +124,7 @@ class BigtableIT extends PipelineSpec { runWithRealContext() { sc => sc .parallelize(data.map(kv => toWriteMutation(kv._1, kv._2))) - .saveAsBigtable(options, tableId, 1) + .saveAsBigtable(projectId, instanceId, tableId) }.waitUntilDone() // Read rows back @@ -166,48 +150,48 @@ class BigtableIT extends PipelineSpec { }.waitUntilFinish() } - "TableAdmin" should "work" in { + "Admin.Table" should "work" in { val id = testId() val tables = Map( s"scio-bigtable-empty-table-$id" -> List(), s"scio-bigtable-one-cf-table-$id" -> List("colfam1"), s"scio-bigtable-two-cf-table-$id" -> List("colfam1", "colfam2") ) - val channel = ChannelPoolCreator.createPool(bigtableOptions) - val executorService = BigtableSessionSharedThreadPools.getInstance().getRetryExecutor - val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions) - val instancePath = s"projects/$projectId/instances/$instanceId" - val tableIds = tables.keys.toSet - def tablePath(table: String): String = s"$instancePath/tables/$table" - def deleteTable(table: String): Unit = - client.deleteTable(DeleteTableRequest.newBuilder().setName(tablePath(table)).build) - - // Delete any tables that could be left around from previous IT run. - val oldTables = listTables(client).intersect(tableIds) - oldTables.foreach(deleteTable) - - // Ensure that the tables don't exist now - listTables(client).intersect(tableIds) shouldBe empty - - // Run UUT - TableAdmin.ensureTables(bigtableOptions, tables) - - // Tables must exist - listTables(client).intersect(tableIds) shouldEqual tableIds - - // Assert Column families exist - for ((table, columnFamilies) <- tables) { - val tableInfo = client.getTable( - GetTableRequest - .newBuilder() - .setName(tablePath(table)) - .build - ) - val actualColumnFamilies = tableInfo.getColumnFamiliesMap.asScala.keys - actualColumnFamilies should contain theSameElementsAs columnFamilies - } - // Clean up and delete - tables.keys.foreach(deleteTable) + val client = BigtableTableAdminClient.create(projectId, instanceId) + try { + val tableIds = tables.keys.toSet + + // Delete any tables that could be left around from previous IT run. + client + .listTables() + .asScala + .filterNot(tableIds.contains) + .foreach(client.deleteTable) + + // Ensure that the tables don't exist now + client.listTables().asScala.toSet.intersect(tableIds) shouldBe empty + + // Run UUT + tables.foreach { case (tableId, cfs) => + Admin.Table.ensureTable(projectId, instanceId, tableId, cfs) + } + + // Tables must exist + client.listTables().asScala should contain allElementsOf tableIds + + // Assert Column families exist + tables.foreach { case (id, columnFamilies) => + val table = client.getTable(id) + val actualFamilies = table.getColumnFamilies.asScala.map(_.getId) + + actualFamilies should contain theSameElementsAs columnFamilies + } + + // Clean up and delete + tableIds.foreach(client.deleteTable) + } finally { + client.close() + } } } diff --git a/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java b/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java index 72251199d2..1012c4b5f6 100644 --- a/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java +++ b/scio-core/src/main/java/com/spotify/scio/transforms/FutureHandlers.java @@ -17,6 +17,10 @@ package com.spotify.scio.transforms; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.common.util.concurrent.*; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -162,4 +166,78 @@ default CompletableFuture addCallback( }); } } + + /** + * A {@link Base} implementation for Google API {@link ApiFuture}. Similar to Guava's + * ListenableFuture, but redeclared so that Guava could be shaded. + */ + public interface GoogleApi extends Base, V> { + /** + * Executor used for callbacks. Default is {@link ForkJoinPool#commonPool()}. Consider + * overriding this method if callbacks are blocking. + * + * @return Executor for callbacks. + */ + default Executor getCallbackExecutor() { + return ForkJoinPool.commonPool(); + } + + @Override + default void waitForFutures(Iterable> futures) + throws InterruptedException, ExecutionException { + // use Future#successfulAsList instead of Futures#allAsList which only works if all + // futures succeed + ApiFutures.successfulAsList(futures).get(); + } + + @Override + default ApiFuture addCallback( + ApiFuture future, Function onSuccess, Function onFailure) { + // Futures#transform doesn't allow onFailure callback while Futures#addCallback doesn't + // guarantee that callbacks are called before ListenableFuture#get() unblocks + SettableApiFuture f = SettableApiFuture.create(); + // if executor rejects the callback, we have to fail the future + Executor rejectPropagationExecutor = + command -> { + try { + getCallbackExecutor().execute(command); + } catch (RejectedExecutionException e) { + f.setException(e); + } + }; + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + @Override + public void onSuccess(@Nullable V result) { + try { + onSuccess.apply(result); + f.set(result); + } catch (Throwable e) { + f.setException(e); + } + } + + @Override + public void onFailure(Throwable t) { + Throwable callbackException = null; + try { + onFailure.apply(t); + } catch (Throwable e) { + // do not fail executing thread if callback fails + // record exception and propagate as suppressed + callbackException = e; + } finally { + if (callbackException != null) { + t.addSuppressed(callbackException); + } + f.setException(t); + } + } + }, + rejectPropagationExecutor); + + return f; + } + } } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala index 1ef57e301b..1187e04b7b 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/BigtableExample.scala @@ -72,13 +72,12 @@ object BigtableWriteExample { // before the ingestion starts. sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15) - // Ensure that destination tables and column families exist - sc.ensureTables( + // Ensure that destination table and column families exist + sc.ensureTable( btProjectId, btInstanceId, - Map( - btTableId -> List(BigtableExample.FAMILY_NAME) - ) + btTableId, + List(BigtableExample.FAMILY_NAME) ) sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR)) diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBatchDoFn.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBatchDoFn.java index f57e748ba0..768f3ad0cd 100644 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBatchDoFn.java +++ b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBatchDoFn.java @@ -17,13 +17,14 @@ package com.spotify.scio.bigtable; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.common.util.concurrent.ListenableFuture; import com.spotify.scio.transforms.BaseAsyncLookupDoFn; import com.spotify.scio.transforms.GuavaAsyncBatchLookupDoFn; import java.io.IOException; import java.util.List; +import java.util.function.Supplier; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.commons.lang3.tuple.Pair; @@ -38,44 +39,45 @@ * @param Bigtable lookup value type. */ public abstract class BigtableBatchDoFn - extends GuavaAsyncBatchLookupDoFn { + extends GuavaAsyncBatchLookupDoFn< + Input, BatchRequest, BatchResponse, Result, BigtableDataClient> { - private final BigtableOptions options; + private final Supplier settingsSupplier; /** Perform asynchronous Bigtable lookup. */ public abstract ListenableFuture asyncLookup( - BigtableSession session, BatchRequest batchRequest); + BigtableDataClient client, BatchRequest batchRequest); /** * Create a {@link BigtableBatchDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. */ public BigtableBatchDoFn( - BigtableOptions options, + Supplier settingsSupplier, int batchSize, SerializableFunction, BatchRequest> batchRequestFn, SerializableFunction>> batchResponseFn, SerializableFunction idExtractorFn) { - this(options, batchSize, batchRequestFn, batchResponseFn, idExtractorFn, 1000); + this(settingsSupplier, batchSize, batchRequestFn, batchResponseFn, idExtractorFn, 1000); } /** * Create a {@link BigtableBatchDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. */ public BigtableBatchDoFn( - BigtableOptions options, + Supplier settingsSupplier, int batchSize, SerializableFunction, BatchRequest> batchRequestFn, SerializableFunction>> batchResponseFn, SerializableFunction idExtractorFn, int maxPendingRequests) { this( - options, + settingsSupplier, batchSize, batchRequestFn, batchResponseFn, @@ -87,13 +89,13 @@ public BigtableBatchDoFn( /** * Create a {@link BigtableBatchDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. * @param cacheSupplier supplier for lookup cache. */ public BigtableBatchDoFn( - BigtableOptions options, + Supplier settingsSupplier, int batchSize, SerializableFunction, BatchRequest> batchRequestFn, SerializableFunction>> batchResponseFn, @@ -107,7 +109,7 @@ public BigtableBatchDoFn( idExtractorFn, maxPendingRequests, cacheSupplier); - this.options = options; + this.settingsSupplier = settingsSupplier; } @Override @@ -116,9 +118,9 @@ public ResourceType getResourceType() { return ResourceType.PER_INSTANCE; } - protected BigtableSession newClient() { + protected BigtableDataClient newClient() { try { - return new BigtableSession(options); + return BigtableDataClient.create(settingsSupplier.get()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java deleted file mode 100644 index b8eb71eac1..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableBulkWriter.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.bigtable.v2.Mutation; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceHelper; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigtableBulkWriter - extends PTransform>>, PDone> { - - private static final Logger LOG = LoggerFactory.getLogger(BigtableBulkWriter.class); - - private final BigtableOptions bigtableOptions; - private final String tableName; - private final int numOfShards; - private final Duration flushInterval; - - public BigtableBulkWriter( - final String tableName, - final BigtableOptions bigtableOptions, - final int numOfShards, - final Duration flushInterval) { - this.bigtableOptions = bigtableOptions; - this.tableName = tableName; - this.numOfShards = numOfShards; - this.flushInterval = flushInterval; - } - - @Override - public PDone expand(PCollection>> input) { - createBulkShards(input, numOfShards, flushInterval) - .apply("Bigtable BulkWrite", ParDo.of(new BigtableBulkWriterFn())); - return PDone.in(input.getPipeline()); - } - - @VisibleForTesting - static PCollection>>> createBulkShards( - final PCollection>> input, - final int numOfShards, - final Duration flushInterval) { - return input - .apply("Assign To Shard", ParDo.of(new AssignToShard(numOfShards))) - .apply( - "Window", - Window.>>>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(flushInterval))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - .apply("Group By Shard", GroupByKey.create()) - .apply( - "Gets Mutations", - ParDo.of( - new DoFn< - KV>>>, - Iterable>>>() { - @ProcessElement - public void process( - @Element KV>>> element, - OutputReceiver>>> out) { - out.output(element.getValue()); - } - })); - } - - private class BigtableBulkWriterFn - extends DoFn>>, Void> { - - private BigtableServiceHelper.Writer bigtableWriter; - private long recordsWritten; - private final ConcurrentLinkedQueue failures; - - public BigtableBulkWriterFn() { - this.failures = new ConcurrentLinkedQueue<>(); - } - - @StartBundle - public void startBundle(StartBundleContext c) throws IOException { - bigtableWriter = - new BigtableServiceHelper(bigtableOptions, c.getPipelineOptions()) - .openForWriting(tableName); - recordsWritten = 0; - } - - @ProcessElement - public void processElement(@Element Iterable>> element) - throws Exception { - checkForFailures(failures); - for (KV> r : element) { - bigtableWriter - .writeRecord(r) - .whenComplete( - (mutationResult, exception) -> { - if (exception != null) { - failures.add(new BigtableWriteException(r, exception)); - } - }); - ++recordsWritten; - } - } - - @FinishBundle - public void finishBundle() throws Exception { - // close the writer and wait for all writes to complete - bigtableWriter.close(); - checkForFailures(failures); - LOG.debug("Wrote {} records", recordsWritten); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("Records Written", recordsWritten)); - } - - /** If any write has asynchronously failed, fail the bundle with a useful error. */ - private void checkForFailures(final ConcurrentLinkedQueue failures) - throws IOException { - // Note that this function is never called by multiple threads and is the only place that - // we remove from failures, so this code is safe. - if (failures.isEmpty()) { - return; - } - - StringBuilder logEntry = new StringBuilder(); - int i = 0; - List suppressed = new ArrayList<>(); - for (; i < 10 && !failures.isEmpty(); ++i) { - BigtableWriteException exc = failures.remove(); - logEntry.append("\n").append(exc.getMessage()); - if (exc.getCause() != null) { - logEntry.append(": ").append(exc.getCause().getMessage()); - } - suppressed.add(exc); - } - String message = - String.format( - "At least %d errors occurred writing to Bigtable. First %d errors: %s", - i + failures.size(), i, logEntry.toString()); - LOG.error(message); - IOException exception = new IOException(message); - for (BigtableWriteException e : suppressed) { - exception.addSuppressed(e); - } - throw exception; - } - - /** An exception that puts information about the failed record being written in its message. */ - class BigtableWriteException extends IOException { - - public BigtableWriteException( - final KV> record, Throwable cause) { - super( - String.format( - "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), record.getValue()), - cause); - } - } - } - - static class AssignToShard - extends DoFn< - KV>, KV>>> { - - private final int numOfShards; - - AssignToShard(final int numOfShards) { - this.numOfShards = numOfShards; - } - - @ProcessElement - public void processElement( - @Element KV> element, - OutputReceiver>>> out) { - // assign this element to a random shard - final long shard = ThreadLocalRandom.current().nextLong(numOfShards); - out.output(KV.of(shard, element)); - } - } -} diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java index 882d8363b0..2c9f5f1275 100644 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java +++ b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableDoFn.java @@ -17,13 +17,13 @@ package com.spotify.scio.bigtable; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.api.core.ApiFuture; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.spotify.scio.transforms.BaseAsyncLookupDoFn; -import com.spotify.scio.transforms.GuavaAsyncLookupDoFn; +import com.spotify.scio.transforms.FutureHandlers; import java.io.IOException; -import java.time.Duration; +import java.util.function.Supplier; import org.apache.beam.sdk.transforms.DoFn; /** @@ -32,53 +32,55 @@ * @param input element type. * @param Bigtable lookup value type. */ -public abstract class BigtableDoFn extends GuavaAsyncLookupDoFn { +public abstract class BigtableDoFn + extends BaseAsyncLookupDoFn, BaseAsyncLookupDoFn.Try> + implements FutureHandlers.GoogleApi { - private final BigtableOptions options; + private final Supplier settingsSupplier; /** Perform asynchronous Bigtable lookup. */ - public abstract ListenableFuture asyncLookup(BigtableSession session, A input); + public abstract ApiFuture asyncLookup(BigtableDataClient client, A input); /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. */ - public BigtableDoFn(BigtableOptions options) { - this(options, 1000); + public BigtableDoFn(Supplier settingsSupplier) { + this(settingsSupplier, 1000); } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. */ - public BigtableDoFn(BigtableOptions options, int maxPendingRequests) { - this(options, maxPendingRequests, new BaseAsyncLookupDoFn.NoOpCacheSupplier<>()); + public BigtableDoFn(Supplier settingsSupplier, int maxPendingRequests) { + this(settingsSupplier, maxPendingRequests, new BaseAsyncLookupDoFn.NoOpCacheSupplier<>()); } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. * @param cacheSupplier supplier for lookup cache. */ public BigtableDoFn( - BigtableOptions options, + Supplier settingsSupplier, int maxPendingRequests, BaseAsyncLookupDoFn.CacheSupplier cacheSupplier) { super(maxPendingRequests, cacheSupplier); - this.options = options; + this.settingsSupplier = settingsSupplier; } /** * Create a {@link BigtableDoFn} instance. * - * @param options Bigtable options. + * @param settingsSupplier Bigtable data settings supplier. * @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This * prevents runner from timing out and retrying bundles. * @param deduplicate if an attempt should be made to de-duplicate simultaneous requests for the @@ -86,12 +88,12 @@ public BigtableDoFn( * @param cacheSupplier supplier for lookup cache. */ public BigtableDoFn( - BigtableOptions options, + Supplier settingsSupplier, int maxPendingRequests, boolean deduplicate, BaseAsyncLookupDoFn.CacheSupplier cacheSupplier) { super(maxPendingRequests, deduplicate, cacheSupplier); - this.options = options; + this.settingsSupplier = settingsSupplier; } @Override @@ -100,16 +102,21 @@ public ResourceType getResourceType() { return ResourceType.PER_INSTANCE; } - @Override - public Duration getTimeout() { - return Duration.ofMillis(options.getCallOptionsConfig().getMutateRpcTimeoutMs()); - } - - protected BigtableSession newClient() { + protected BigtableDataClient newClient() { try { - return new BigtableSession(options); + return BigtableDataClient.create(settingsSupplier.get()); } catch (IOException e) { throw new RuntimeException(e); } } + + @Override + public BaseAsyncLookupDoFn.Try success(B output) { + return new Try<>(output); + } + + @Override + public BaseAsyncLookupDoFn.Try failure(Throwable throwable) { + return new Try<>(throwable); + } } diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java deleted file mode 100644 index 752dc41976..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/BigtableUtil.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 2016 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.bigtable.admin.v2.Cluster; -import com.google.bigtable.admin.v2.ListClustersRequest; -import com.google.bigtable.admin.v2.ListClustersResponse; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableClusterUtilities; -import com.google.cloud.bigtable.grpc.BigtableInstanceClient; -import com.google.cloud.bigtable.grpc.BigtableInstanceGrpcClient; -import com.google.cloud.bigtable.grpc.io.ChannelPool; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.joda.time.Duration; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utilities to deal with Bigtable. */ -public final class BigtableUtil { - - private BigtableUtil() {} - - private static final Logger LOG = LoggerFactory.getLogger(BigtableUtil.class); - - private static final PeriodFormatter formatter = - new PeriodFormatterBuilder() - .appendDays() - .appendSuffix("d") - .appendHours() - .appendSuffix("h") - .appendMinutes() - .appendSuffix("m") - .appendSeconds() - .appendSuffix("s") - .toFormatter(); - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param bigtableOptions Bigtable Options - * @param numberOfNodes New number of nodes in the cluster - * @param sleepDuration How long to sleep after updating the number of nodes. Google recommends at - * least 20 minutes before the new nodes are fully functional - * @throws IOException If setting up channel pool fails - * @throws InterruptedException If sleep fails - */ - public static void updateNumberOfBigtableNodes( - final BigtableOptions bigtableOptions, final int numberOfNodes, final Duration sleepDuration) - throws IOException, InterruptedException { - updateNumberOfBigtableNodes( - bigtableOptions, numberOfNodes, sleepDuration, Collections.emptySet()); - } - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param bigtableOptions Bigtable Options - * @param numberOfNodes New number of nodes in the cluster - * @param sleepDuration How long to sleep after updating the number of nodes. Google recommends at - * least 20 minutes before the new nodes are fully functional - * @param clusterNames Names of clusters to be updated, all if empty - * @throws IOException If setting up channel pool fails - * @throws InterruptedException If sleep fails - */ - public static void updateNumberOfBigtableNodes( - final BigtableOptions bigtableOptions, - final int numberOfNodes, - final Duration sleepDuration, - final Set clusterNames) - throws IOException, InterruptedException { - final ChannelPool channelPool = ChannelPoolCreator.createPool(bigtableOptions); - - try { - final BigtableInstanceClient bigtableInstanceClient = - new BigtableInstanceGrpcClient(channelPool); - - final String instanceName = bigtableOptions.getInstanceName().toString(); - - // Fetch clusters in Bigtable instance - final ListClustersRequest clustersRequest = - ListClustersRequest.newBuilder().setParent(instanceName).build(); - final ListClustersResponse clustersResponse = - bigtableInstanceClient.listCluster(clustersRequest); - final List clustersToUpdate = - clusterNames.isEmpty() - ? clustersResponse.getClustersList() - : clustersResponse.getClustersList().stream() - .filter(c -> clusterNames.contains(shorterName(c.getName()))) - .collect(Collectors.toList()); - - // For each cluster update the number of nodes - for (Cluster cluster : clustersToUpdate) { - final Cluster updatedCluster = - Cluster.newBuilder().setName(cluster.getName()).setServeNodes(numberOfNodes).build(); - LOG.info("Updating number of nodes to {} for cluster {}", numberOfNodes, cluster.getName()); - bigtableInstanceClient.updateCluster(updatedCluster); - } - - // Wait for the new nodes to be provisioned - if (sleepDuration.getMillis() > 0) { - LOG.info("Sleeping for {} after update", formatter.print(sleepDuration.toPeriod())); - Thread.sleep(sleepDuration.getMillis()); - } - } finally { - channelPool.shutdownNow(); - } - } - - /** - * Get size of all clusters for specified Bigtable instance. - * - * @param projectId GCP projectId - * @param instanceId Bigtable instanceId - * @return map of clusterId to its number of nodes - * @throws IOException If setting up channel pool fails - * @throws GeneralSecurityException If security-related exceptions occurs - */ - public static Map getClusterSizes( - final String projectId, final String instanceId) - throws IOException, GeneralSecurityException { - try (BigtableClusterUtilities clusterUtil = - BigtableClusterUtilities.forInstance(projectId, instanceId)) { - return Collections.unmodifiableMap( - clusterUtil.getClusters().getClustersList().stream() - .collect( - Collectors.toMap( - cn -> cn.getName().substring(cn.getName().indexOf("/clusters/") + 10), - Cluster::getServeNodes))); - } - } - - static String shorterName(String name) { - if (name.lastIndexOf('/') != -1) { - return name.substring(name.lastIndexOf('/') + 1, name.length()); - } else { - return name; - } - } -} diff --git a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java b/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java deleted file mode 100644 index 3303c1d6eb..0000000000 --- a/scio-google-cloud-platform/src/main/java/com/spotify/scio/bigtable/ChannelPoolCreator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2016 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; -import com.google.cloud.bigtable.grpc.io.ChannelPool; -import com.google.cloud.bigtable.grpc.io.CredentialInterceptorCache; -import io.grpc.ClientInterceptor; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ChannelPoolCreator { - private static final Logger LOG = LoggerFactory.getLogger(ChannelPoolCreator.class); - - private static ClientInterceptor[] getClientInterceptors(final BigtableOptions options) { - try { - final ClientInterceptor interceptor = - CredentialInterceptorCache.getInstance() - .getCredentialsInterceptor(options.getCredentialOptions()); - // If credentials are unset (i.e. via local emulator), CredentialsInterceptor will return null - if (interceptor == null) { - return new ClientInterceptor[] {}; - } else { - return new ClientInterceptor[] {interceptor}; - } - } catch (Exception e) { - LOG.error( - "Failed to get credentials interceptor. No interceptor will be used for the channel.", e); - return new ClientInterceptor[] {}; - } - } - - public static ChannelPool createPool(final BigtableOptions options) throws IOException { - final ClientInterceptor[] interceptors = getClientInterceptors(options); - - return new ChannelPool( - () -> - BigtableSession.createNettyChannel( - options.getAdminHost(), options, false, interceptors), - 1); - } -} diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java deleted file mode 100644 index dc185b6f51..0000000000 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigtable; - -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.data.v2.BigtableDataSettings; -import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; - -/** Wrap {@link BigtableServiceImpl} and expose package private methods. */ -public class BigtableServiceHelper extends BigtableServiceImpl { - private static final BigtableConfig EMPTY_CONFIG = - BigtableConfig.builder().setValidate(true).build(); - - public BigtableServiceHelper(BigtableOptions bigtableOptions, PipelineOptions pipelineOptions) - throws IOException { - super(translateToVeneerSettings(bigtableOptions, pipelineOptions)); - } - - public Writer openForWriting(String tableId) { - BigtableWriteOptions options = - BigtableWriteOptions.builder().setTableId(StaticValueProvider.of(tableId)).build(); - return openForWriting(options); - } - - private static BigtableDataSettings translateToVeneerSettings( - BigtableOptions bigtableOptions, PipelineOptions pipelineOptions) throws IOException { - final BigtableConfig config = - BigtableConfigTranslator.translateToBigtableConfig(EMPTY_CONFIG, bigtableOptions); - return BigtableConfigTranslator.translateToVeneerSettings(config, pipelineOptions); - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala new file mode 100644 index 0000000000..d443e5518c --- /dev/null +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/Admin.scala @@ -0,0 +1,271 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.bigtable + +import com.google.cloud.bigtable.admin.v2.models.GCRules.GCRule +import com.google.cloud.bigtable.admin.v2.models.{ + CreateTableRequest, + GCRules, + ModifyColumnFamiliesRequest +} +import com.google.cloud.bigtable.admin.v2.{BigtableInstanceAdminClient, BigtableTableAdminClient} +import org.joda.time.Duration +import org.slf4j.{Logger, LoggerFactory} + +import java.util.concurrent.TimeUnit + +import scala.collection.concurrent +import scala.collection.concurrent.TrieMap +import scala.jdk.CollectionConverters._ + +/** + * Bigtable Table Admin API helper commands. + * + * Caches Bigtable clients and exposes basic operations + */ +object Admin { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + sys.addShutdownHook { + logger.info("Shutting down Bigtable clients") + Instance.clients.values.foreach(_.close()) + Table.clients.values.foreach(_.close()) + } + + object Table { + + sealed trait CreateDisposition + object CreateDisposition { + case object Never extends CreateDisposition + case object CreateIfNeeded extends CreateDisposition + + val Default: CreateDisposition = CreateIfNeeded + } + + private[bigtable] val clients: concurrent.Map[(String, String), BigtableTableAdminClient] = + TrieMap.empty + private def getOrCreateClient( + projectId: String, + instanceId: String + ): BigtableTableAdminClient = { + val key = (projectId, instanceId) + clients.getOrElseUpdate( + key, + BigtableTableAdminClient.create(projectId, instanceId) + ) + } + + /** + * Retrieves a set of tables from the given instancePath. + * + * @param client + * Client for calling Bigtable. + * @return + */ + private def fetchTables(client: BigtableTableAdminClient): Set[String] = + client.listTables().asScala.toSet + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables ids and column families. Values are a list of column family names. + */ + def ensureTable( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[String], + createDisposition: CreateDisposition = CreateDisposition.Default + ): Unit = { + val tcf = columnFamilies.map(cf => cf -> None) + ensureTableImpl(projectId, instanceId, tableId, tcf, createDisposition) + } + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of table Ids and column families. Values are a list of column family names along with + * the desired cell expiration. Cell expiration is the duration before which garbage + * collection of a cell may occur. Note: minimum granularity is one second. + */ + def ensureTablesWithExpiration( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[Duration])], + createDisposition: CreateDisposition = CreateDisposition.Default + ): Unit = { + // Convert Duration to GcRule + val x = columnFamilies.map { case (columnFamily, duration) => + (columnFamily, duration.map(maxAgeGcRule)) + } + ensureTableImpl(projectId, instanceId, tableId, x, createDisposition) + } + + private def maxAgeGcRule(duration: Duration): GCRule = + GCRules.GCRULES.maxAge(duration.getStandardSeconds, TimeUnit.SECONDS) + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables Id and column families. Values are a list of column family names along with + * the desired GcRule. + */ + def ensureTableWithGcRules( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[GCRule])], + createDisposition: CreateDisposition = CreateDisposition.Default + ): Unit = ensureTableImpl(projectId, instanceId, tableId, columnFamilies, createDisposition) + + /** + * Ensure that tables and column families exist. Checks for existence of tables or creates them + * if they do not exist. Also checks for existence of column families within each table and + * creates them if they do not exist. + * + * @param tablesAndColumnFamilies + * A map of tables and column families. Keys are table names. Values are a list of column + * family names. + */ + private def ensureTableImpl( + projectId: String, + instanceId: String, + tableId: String, + columnFamilies: Iterable[(String, Option[GCRule])], + createDisposition: CreateDisposition + ): Unit = { + logger.info("Ensuring tables and column families exist in instance {}", instanceId) + + val client = getOrCreateClient(projectId, instanceId) + val existingTables = fetchTables(client) + val exists = existingTables.contains(tableId) + if (exists) { + logger.info("Table {} exists", tableId) + } else { + createDisposition match { + case CreateDisposition.CreateIfNeeded => + logger.info("Creating table {}", tableId) + client.createTable(CreateTableRequest.of(tableId)) + + val table = client.getTable(tableId) + val cf = table.getColumnFamilies.asScala.map(c => c.getId -> c).toMap + + val modifyRequest = columnFamilies.foldLeft(ModifyColumnFamiliesRequest.of(tableId)) { + case (mr, (id, gcrOpt)) => + val gcRule = gcrOpt.getOrElse(GCRules.GCRULES.defaultRule()) + cf.get(id) match { + case None => mr.addFamily(id, gcRule) + case Some(_) => mr.updateFamily(id, gcRule) + } + } + + logger.info("Modifying column families for table {}", tableId) + client.modifyFamilies(modifyRequest) + case CreateDisposition.Never => + throw new IllegalStateException(s"Table $tableId does not exist") + } + } + } + + /** + * Permanently deletes a row range from the specified table that match a particular prefix. + * + * @param table + * table name + * @param rowPrefix + * row key prefix + */ + def dropRowRange( + projectId: String, + instanceId: String, + tableId: String, + rowPrefix: String + ): Unit = { + val client = getOrCreateClient(projectId, instanceId) + client.dropRowRange(tableId, rowPrefix) + } + } + + object Instance { + private[bigtable] val clients: concurrent.Map[String, BigtableInstanceAdminClient] = + TrieMap.empty + private def getOrCreateClient(projectId: String): BigtableInstanceAdminClient = { + clients.getOrElseUpdate( + projectId, + BigtableInstanceAdminClient.create(projectId) + ) + } + + /** + * Updates clusters within the specified Bigtable instance to a specified number of nodes. + * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the + * end to lower costs yet still get high throughput during bulk ingests/dumps. + * + * @param numberOfNodes + * New number of nodes in the cluster + * @param sleepDuration + * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes + * before the new nodes are fully functional + */ + def updateNumberOfBigtableNodes( + projectId: String, + instanceId: String, + clusterIds: Set[String], + numberOfNodes: Int, + sleepDuration: Duration + ): Unit = { + val client = getOrCreateClient(projectId) + val ids: Iterable[String] = if (clusterIds.isEmpty) { + client.listClusters(instanceId).asScala.map(_.getId) + } else { + clusterIds + } + + ids.foreach { clusterId => + logger.info("Updating number of nodes to {} for cluster {}", numberOfNodes, clusterId) + client.resizeCluster(instanceId, clusterId, numberOfNodes) + } + + if (sleepDuration.isLongerThan(Duration.ZERO)) { + logger.info("Sleeping for {} after update", sleepDuration) + Thread.sleep(sleepDuration.getMillis) + } + } + + /** + * Get size of all clusters for specified Bigtable instance. + * + * @return + * map of clusterId to its number of nodes + */ + def getClusterSizes(projectId: String, instanceId: String): Map[String, Int] = { + val client = getOrCreateClient(projectId) + client.listClusters(instanceId).asScala.map(c => c.getId -> c.getServeNodes).toMap + } + } +} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala index 1cabe73a7f..3425dd87ca 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/BigTableIO.scala @@ -18,54 +18,50 @@ package com.spotify.scio.bigtable import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions import com.google.protobuf.ByteString import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO} -import com.spotify.scio.util.Functions -import com.spotify.scio.values.SCollection +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO, WriteResultIO} +import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} +import org.apache.beam.sdk.io.gcp.bigtable.{BigtableWriteResult, BigtableWriteResultCoder} import org.apache.beam.sdk.io.gcp.{bigtable => beam} import org.apache.beam.sdk.io.range.ByteKeyRange -import org.apache.beam.sdk.values.KV +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} +import org.apache.beam.sdk.values.{KV, PCollectionTuple} import org.joda.time.Duration -import org.typelevel.scalaccompat.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.util.chaining._ -sealed trait BigtableIO[T] extends ScioIO[T] { +sealed abstract class BigtableIO[T](projectId: String, instanceId: String, tableId: String) + extends ScioIO[T] { final override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + override def testId: String = s"BigtableIO($projectId:$instanceId:$tableId)" } object BigtableIO { final def apply[T](projectId: String, instanceId: String, tableId: String): BigtableIO[T] = - new BigtableIO[T] with TestIO[T] { - override def testId: String = - s"BigtableIO($projectId\t$instanceId\t$tableId)" - } + new BigtableIO[T](projectId, instanceId, tableId) with TestIO[T] } -final case class BigtableRead(bigtableOptions: BigtableOptions, tableId: String) - extends BigtableIO[Row] { +final case class BigtableRead(projectId: String, instanceId: String, tableId: String) + extends BigtableIO[Row](projectId, instanceId, tableId) { override type ReadP = BigtableRead.ReadParam override type WriteP = Nothing - override def testId: String = - s"BigtableIO(${bigtableOptions.getProjectId}\t${bigtableOptions.getInstanceId}\t$tableId)" - override protected def read(sc: ScioContext, params: ReadP): SCollection[Row] = { val coder = CoderMaterializer.beam(sc, Coder.protoMessageCoder[Row]) - val opts = bigtableOptions // defeat closure val read = beam.BigtableIO .read() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) + .withProjectId(projectId) + .withInstanceId(instanceId) .withTableId(tableId) - .withBigtableOptionsConfigurator(Functions.serializableFn(_ => opts.toBuilder)) - .withMaxBufferElementCount(params.maxBufferElementCount.map(Int.box).orNull) - .pipe(r => if (params.keyRanges.isEmpty) r else r.withKeyRanges(params.keyRanges.asJava)) - .pipe(r => Option(params.rowFilter).fold(r)(r.withRowFilter)): @nowarn("cat=deprecation") + .withKeyRanges(params.keyRanges.asJava) + .pipe(r => Option(params.rowFilter).fold(r)(r.withRowFilter)) + .pipe(r => params.maxBufferElementCount.fold(r)(r.withMaxBufferElementCount(_))) + .pipe(r => Option(params.appProfileId).fold(r)(r.withAppProfileId)) + .pipe(r => Option(params.attemptTimeout).fold(r)(r.withAttemptTimeout)) + .pipe(r => Option(params.operationTimeout).fold(r)(r.withOperationTimeout)) sc.applyTransform(read).setCoder(coder) } @@ -81,40 +77,33 @@ final case class BigtableRead(bigtableOptions: BigtableOptions, tableId: String) object BigtableRead { object ReadParam { - val DefaultKeyRanges: Seq[ByteKeyRange] = Seq.empty[ByteKeyRange] + val DefaultKeyRanges: Seq[ByteKeyRange] = Seq(ByteKeyRange.ALL_KEYS) val DefaultRowFilter: RowFilter = null val DefaultMaxBufferElementCount: Option[Int] = None - - def apply(keyRange: ByteKeyRange) = new ReadParam(Seq(keyRange)) - - def apply(keyRange: ByteKeyRange, rowFilter: RowFilter): ReadParam = - new ReadParam(Seq(keyRange), rowFilter) + val DefaultAppProfileId: String = null + val DefaultAttemptTimeout: Duration = null + val DefaultOperationTimeout: Duration = null } final case class ReadParam private ( keyRanges: Seq[ByteKeyRange] = ReadParam.DefaultKeyRanges, rowFilter: RowFilter = ReadParam.DefaultRowFilter, - maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount + maxBufferElementCount: Option[Int] = ReadParam.DefaultMaxBufferElementCount, + appProfileId: String = ReadParam.DefaultAppProfileId, + attemptTimeout: Duration = ReadParam.DefaultAttemptTimeout, + operationTimeout: Duration = ReadParam.DefaultOperationTimeout ) - - final def apply(projectId: String, instanceId: String, tableId: String): BigtableRead = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - BigtableRead(bigtableOptions, tableId) - } } -final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, tableId: String) - extends BigtableIO[(ByteString, Iterable[T])] { - override type ReadP = Nothing +final case class BigtableWrite[T <: Mutation]( + projectId: String, + instanceId: String, + tableId: String +) extends BigtableIO[(ByteString, Iterable[T])](projectId, instanceId, tableId) + with WriteResultIO[(ByteString, Iterable[T])] { + override type ReadP = Unit override type WriteP = BigtableWrite.WriteParam - override def testId: String = - s"BigtableIO(${bigtableOptions.getProjectId}\t${bigtableOptions.getInstanceId}\t$tableId)" - override protected def read( sc: ScioContext, params: ReadP @@ -123,33 +112,33 @@ final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, "BigtableWrite is write-only, use Row to read from Bigtable" ) - override protected def write( + override protected def writeWithResult( data: SCollection[(ByteString, Iterable[T])], params: WriteP - ): Tap[Nothing] = { - val sink = - params match { - case BigtableWrite.Default => - val opts = bigtableOptions // defeat closure - beam.BigtableIO - .write() - .withProjectId(bigtableOptions.getProjectId) - .withInstanceId(bigtableOptions.getInstanceId) - .withTableId(tableId) - .withBigtableOptionsConfigurator( - Functions.serializableFn(_ => opts.toBuilder) - ): @nowarn("cat=deprecation") - case BigtableWrite.Bulk(numOfShards, flushInterval) => - new BigtableBulkWriter(tableId, bigtableOptions, numOfShards, flushInterval) - } - data.transform_("Bigtable write") { coll => + ): (Tap[Nothing], SideOutputCollections) = { + val t = beam.BigtableIO + .write() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withTableId(tableId) + .withFlowControl(params.flowControl) + .pipe(w => Option(params.errorHandler).fold(w)(w.withErrorHandler)) + .pipe(w => Option(params.appProfileId).fold(w)(w.withAppProfileId)) + .pipe(w => Option(params.attemptTimeout).fold(w)(w.withAttemptTimeout)) + .pipe(w => Option(params.operationTimeout).fold(w)(w.withOperationTimeout)) + .pipe(w => params.maxBytesPerBatch.fold(w)(w.withMaxBytesPerBatch)) + .pipe(w => params.maxElementsPerBatch.fold(w)(w.withMaxElementsPerBatch)) + .pipe(w => params.maxOutstandingBytes.fold(w)(w.withMaxOutstandingBytes)) + .pipe(w => params.maxOutstandingElements.fold(w)(w.withMaxOutstandingElements)) + .withWriteResults() + + val result = data.transform_("Bigtable write") { coll => coll - .map { case (key, value) => - KV.of(key, value.asJava.asInstanceOf[java.lang.Iterable[Mutation]]) - } - .applyInternal(sink) + .map { case (key, mutations) => KV.of(key, (mutations: Iterable[Mutation]).asJava) } + .applyInternal(t) } - EmptyTap + val sideOutput = PCollectionTuple.of(BigtableWrite.BigtableWriteResult.tupleTag, result) + (tap(()), SideOutputCollections(sideOutput, data.context)) } override def tap(params: ReadP): Tap[Nothing] = @@ -157,28 +146,34 @@ final case class BigtableWrite[T <: Mutation](bigtableOptions: BigtableOptions, } object BigtableWrite { - sealed trait WriteParam - object Default extends WriteParam - object Bulk { - private[bigtable] val DefaultFlushInterval = Duration.standardSeconds(1) + // TODO should this be here ? + implicit val bigtableWriteResultCoder: Coder[BigtableWriteResult] = + Coder.beam(new BigtableWriteResultCoder) + + lazy val BigtableWriteResult: SideOutput[BigtableWriteResult] = SideOutput() + + object WriteParam { + val DefaultFlowControl: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultAppProfileId: String = null + val DefaultAttemptTimeout: Duration = null + val DefaultOperationTimeout: Duration = null + val DefaultMaxBytesPerBatch: Option[Long] = None + val DefaultMaxElementsPerBatch: Option[Long] = None + val DefaultMaxOutstandingBytes: Option[Long] = None + val DefaultMaxOutstandingElements: Option[Long] = None } - final case class Bulk private ( - numOfShards: Int, - flushInterval: Duration = Bulk.DefaultFlushInterval - ) extends WriteParam - - final def apply[T <: Mutation]( - projectId: String, - instanceId: String, - tableId: String - ): BigtableWrite[T] = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - BigtableWrite[T](bigtableOptions, tableId) - } + final case class WriteParam private ( + flowControl: Boolean = WriteParam.DefaultFlowControl, + errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, + appProfileId: String = WriteParam.DefaultAppProfileId, + attemptTimeout: Duration = WriteParam.DefaultAttemptTimeout, + operationTimeout: Duration = WriteParam.DefaultOperationTimeout, + maxBytesPerBatch: Option[Long] = WriteParam.DefaultMaxBytesPerBatch, + maxElementsPerBatch: Option[Long] = WriteParam.DefaultMaxElementsPerBatch, + maxOutstandingBytes: Option[Long] = WriteParam.DefaultMaxOutstandingBytes, + maxOutstandingElements: Option[Long] = WriteParam.DefaultMaxOutstandingElements + ) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala deleted file mode 100644 index 1a316382e2..0000000000 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/TableAdmin.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright 2019 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable - -import java.nio.charset.Charset - -import com.google.bigtable.admin.v2._ -import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification -import com.google.cloud.bigtable.config.BigtableOptions -import com.google.cloud.bigtable.grpc._ -import com.google.protobuf.{ByteString, Duration => ProtoDuration} -import org.joda.time.Duration -import org.slf4j.{Logger, LoggerFactory} - -import scala.jdk.CollectionConverters._ -import scala.util.Try - -/** Bigtable Table Admin API helper commands. */ -object TableAdmin { - - sealed trait CreateDisposition - object CreateDisposition { - case object Never extends CreateDisposition - case object CreateIfNeeded extends CreateDisposition - val default = CreateIfNeeded - } - - private val log: Logger = LoggerFactory.getLogger(TableAdmin.getClass) - - private def adminClient[A]( - bigtableOptions: BigtableOptions - )(f: BigtableTableAdminClient => A): Try[A] = { - val channel = - ChannelPoolCreator.createPool(bigtableOptions) - val executorService = - BigtableSessionSharedThreadPools.getInstance().getRetryExecutor - val client = new BigtableTableAdminGrpcClient(channel, executorService, bigtableOptions) - - val result = Try(f(client)) - channel.shutdownNow() - result - } - - /** - * Retrieves a set of tables from the given instancePath. - * - * @param client - * Client for calling Bigtable. - * @param instancePath - * String of the form "projects/$project/instances/$instance". - * @return - */ - private def fetchTables(client: BigtableTableAdminClient, instancePath: String): Set[String] = - client - .listTables( - ListTablesRequest - .newBuilder() - .setParent(instancePath) - .build() - ) - .getTablesList - .asScala - .map(_.getName) - .toSet - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = { - val tcf = tablesAndColumnFamilies.iterator.map { case (k, l) => - k -> l.map(_ -> None) - }.toMap - ensureTablesImpl(bigtableOptions, tcf, createDisposition).get - } - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is one second. - */ - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = { - // Convert Duration to GcRule - val x = tablesAndColumnFamilies.iterator.map { case (k, v) => - k -> v.map { case (columnFamily, duration) => - (columnFamily, duration.map(gcRuleFromDuration)) - } - }.toMap - - ensureTablesImpl(bigtableOptions, x, createDisposition).get - } - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired GcRule. - */ - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: CreateDisposition = CreateDisposition.default - ): Unit = - ensureTablesImpl(bigtableOptions, tablesAndColumnFamilies, createDisposition).get - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - private def ensureTablesImpl( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: CreateDisposition - ): Try[Unit] = { - val project = bigtableOptions.getProjectId - val instance = bigtableOptions.getInstanceId - val instancePath = s"projects/$project/instances/$instance" - - log.info("Ensuring tables and column families exist in instance {}", instance) - - adminClient(bigtableOptions) { client => - val existingTables = fetchTables(client, instancePath) - - tablesAndColumnFamilies.foreach { case (table, columnFamilies) => - val tablePath = s"$instancePath/tables/$table" - - val exists = existingTables.contains(tablePath) - createDisposition match { - case _ if exists => - log.info("Table {} exists", table) - case CreateDisposition.CreateIfNeeded => - log.info("Creating table {}", table) - client.createTable( - CreateTableRequest - .newBuilder() - .setParent(instancePath) - .setTableId(table) - .build() - ) - case CreateDisposition.Never => - throw new IllegalStateException(s"Table $table does not exist") - } - - ensureColumnFamilies(client, tablePath, columnFamilies, createDisposition) - } - } - } - - /** - * Ensure that column families exist. Checks for existence of column families and creates them if - * they don't exist. - * - * @param tablePath - * A full table path that the bigtable API expects, in the form of - * `projects/projectId/instances/instanceId/tables/tableId` - * @param columnFamilies - * A list of column family names. - */ - private def ensureColumnFamilies( - client: BigtableTableAdminClient, - tablePath: String, - columnFamilies: Iterable[(String, Option[GcRule])], - createDisposition: CreateDisposition - ): Unit = - createDisposition match { - case CreateDisposition.CreateIfNeeded => - val tableInfo = - client.getTable(GetTableRequest.newBuilder().setName(tablePath).build) - - val cfList = columnFamilies - .map { case (n, gcRule) => - val cf = tableInfo - .getColumnFamiliesOrDefault(n, ColumnFamily.newBuilder().build()) - .toBuilder - .setGcRule(gcRule.getOrElse(GcRule.getDefaultInstance)) - .build() - - (n, cf) - } - val modifications = - cfList.map { case (n, cf) => - val mod = Modification.newBuilder().setId(n) - if (tableInfo.containsColumnFamilies(n)) { - mod.setUpdate(cf) - } else { - mod.setCreate(cf) - } - mod.build() - } - - log.info( - "Modifying or updating {} column families for table {}", - modifications.size, - tablePath - ) - - if (modifications.nonEmpty) { - client.modifyColumnFamily( - ModifyColumnFamiliesRequest - .newBuilder() - .setName(tablePath) - .addAllModifications(modifications.asJava) - .build - ) - } - () - case CreateDisposition.Never => - () - } - - private def gcRuleFromDuration(duration: Duration): GcRule = { - val protoDuration = ProtoDuration.newBuilder.setSeconds(duration.getStandardSeconds) - GcRule.newBuilder.setMaxAge(protoDuration).build - } - - /** - * Permanently deletes a row range from the specified table that match a particular prefix. - * - * @param table - * table name - * @param rowPrefix - * row key prefix - */ - def dropRowRange(bigtableOptions: BigtableOptions, table: String, rowPrefix: String): Try[Unit] = - adminClient(bigtableOptions) { client => - val project = bigtableOptions.getProjectId - val instance = bigtableOptions.getInstanceId - val instancePath = s"projects/$project/instances/$instance" - val tablePath = s"$instancePath/tables/$table" - - dropRowRange(tablePath, rowPrefix, client) - } - - /** - * Permanently deletes a row range from the specified table that match a particular prefix. - * - * @param tablePath - * A full table path that the bigtable API expects, in the form of - * `projects/projectId/instances/instanceId/tables/tableId` - * @param rowPrefix - * row key prefix - */ - private def dropRowRange( - tablePath: String, - rowPrefix: String, - client: BigtableTableAdminClient - ): Unit = { - val request = DropRowRangeRequest - .newBuilder() - .setName(tablePath) - .setRowKeyPrefix(ByteString.copyFrom(rowPrefix, Charset.forName("UTF-8"))) - .build() - - client.dropRowRange(request) - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala index f918df27bb..37f9a9b385 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala @@ -18,13 +18,12 @@ package com.spotify.scio.bigtable.syntax import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions import com.google.protobuf.ByteString import com.spotify.scio.io.ClosedTap import com.spotify.scio.values.SCollection import org.joda.time.Duration - import com.spotify.scio.bigtable.BigtableWrite +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Bigtable methods. @@ -34,26 +33,34 @@ final class SCollectionMutationOps[T <: Mutation]( ) { /** Save this SCollection as a Bigtable table. Note that elements must be of type `Mutation`. */ - def saveAsBigtable(projectId: String, instanceId: String, tableId: String): ClosedTap[Nothing] = - self.write(BigtableWrite[T](projectId, instanceId, tableId))(BigtableWrite.Default) - - /** Save this SCollection as a Bigtable table. Note that elements must be of type `Mutation`. */ - def saveAsBigtable(bigtableOptions: BigtableOptions, tableId: String): ClosedTap[Nothing] = - self.write(BigtableWrite[T](bigtableOptions, tableId))(BigtableWrite.Default) - - /** - * Save this SCollection as a Bigtable table. This version supports batching. Note that elements - * must be of type `Mutation`. - */ def saveAsBigtable( - bigtableOptions: BigtableOptions, + projectId: String, + instanceId: String, tableId: String, - numOfShards: Int, - flushInterval: Duration = BigtableWrite.Bulk.DefaultFlushInterval - ): ClosedTap[Nothing] = - self.write(BigtableWrite[T](bigtableOptions, tableId))( - BigtableWrite.Bulk(numOfShards, flushInterval) + flowControl: Boolean = BigtableWrite.WriteParam.DefaultFlowControl, + errorHandler: ErrorHandler[BadRecord, _] = BigtableWrite.WriteParam.DefaultErrorHandler, + appProfileId: String = BigtableWrite.WriteParam.DefaultAppProfileId, + attemptTimeout: Duration = BigtableWrite.WriteParam.DefaultAttemptTimeout, + operationTimeout: Duration = BigtableWrite.WriteParam.DefaultOperationTimeout, + maxBytesPerBatch: Option[Long] = BigtableWrite.WriteParam.DefaultMaxBytesPerBatch, + maxElementsPerBatch: Option[Long] = BigtableWrite.WriteParam.DefaultMaxElementsPerBatch, + maxOutstandingBytes: Option[Long] = BigtableWrite.WriteParam.DefaultMaxOutstandingBytes, + maxOutstandingElements: Option[Long] = BigtableWrite.WriteParam.DefaultMaxOutstandingElements + ): ClosedTap[Nothing] = { + val param = BigtableWrite.WriteParam( + flowControl, + errorHandler, + appProfileId, + attemptTimeout, + operationTimeout, + maxBytesPerBatch, + maxElementsPerBatch, + maxOutstandingBytes, + maxOutstandingElements ) + + self.write(BigtableWrite[T](projectId, instanceId, tableId))(param) + } } trait SCollectionSyntax { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala index cd21ee3f90..6596d0491c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala @@ -17,49 +17,23 @@ package com.spotify.scio.bigtable.syntax -import com.google.bigtable.admin.v2.GcRule -import com.google.bigtable.v2._ -import com.google.cloud.bigtable.config.BigtableOptions +import com.google.bigtable.v2.{Row, RowFilter} +import com.google.cloud.bigtable.admin.v2.models.GCRules.GCRule import com.spotify.scio.ScioContext -import com.spotify.scio.bigtable.BigtableRead -import com.spotify.scio.bigtable.BigtableUtil -import com.spotify.scio.bigtable.TableAdmin +import com.spotify.scio.bigtable.{Admin, BigtableRead} import com.spotify.scio.values.SCollection import org.apache.beam.sdk.io.range.ByteKeyRange import org.joda.time.Duration -import scala.jdk.CollectionConverters._ - object ScioContextOps { - private val DefaultSleepDuration = Duration.standardMinutes(20) - private val DefaultClusterNames: Set[String] = Set.empty + private val DefaultSleepDuration: Duration = Duration.standardMinutes(20) + private val DefaultClusterIds: Set[String] = Set.empty } /** Enhanced version of [[ScioContext]] with Bigtable methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { import ScioContextOps._ - /** Get an SCollection for a Bigtable table. */ - def bigtable( - projectId: String, - instanceId: String, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter - ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - projectId: String, - instanceId: String, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = - bigtable(projectId, instanceId, tableId, Seq(keyRange), rowFilter, maxBufferElementCount) - /** Get an SCollection for a Bigtable table. */ def bigtable( projectId: String, @@ -73,69 +47,25 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { self.read(BigtableRead(projectId, instanceId, tableId))(parameters) } - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter - ): SCollection[Row] = - bigtable(bigtableOptions, tableId, Seq(keyRange), rowFilter) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRange: ByteKeyRange, - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = - bigtable(bigtableOptions, tableId, Seq(keyRange), rowFilter, maxBufferElementCount) - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter) - self.read(BigtableRead(bigtableOptions, tableId))(parameters) - } - - /** Get an SCollection for a Bigtable table. */ - def bigtable( - bigtableOptions: BigtableOptions, - tableId: String, - keyRanges: Seq[ByteKeyRange], - rowFilter: RowFilter, - maxBufferElementCount: Option[Int] - ): SCollection[Row] = { - val parameters = BigtableRead.ReadParam(keyRanges, rowFilter, maxBufferElementCount) - self.read(BigtableRead(bigtableOptions, tableId))(parameters) - } - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. + * Updates given clusters within the specified Bigtable instance to a specified number of nodes. * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * - * @param sleepDuration - * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes - * before the new nodes are fully functional + * @param numberOfNodes + * desired number of nodes for the clusters */ def updateNumberOfBigtableNodes( projectId: String, instanceId: String, - numberOfNodes: Int, - sleepDuration: Duration = DefaultSleepDuration + numberOfNodes: Int ): Unit = updateNumberOfBigtableNodes( projectId, instanceId, + DefaultClusterIds, numberOfNodes, - DefaultClusterNames, - sleepDuration + DefaultSleepDuration ) /** @@ -143,45 +73,23 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * + * @param numberOfNodes + * desired number of nodes for the clusters * @param sleepDuration * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes * before the new nodes are fully functional - * @param clusterNames - * Names of clusters to be updated, all if empty */ def updateNumberOfBigtableNodes( projectId: String, instanceId: String, numberOfNodes: Int, - clusterNames: Set[String], - sleepDuration: Duration - ): Unit = { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - updateNumberOfBigtableNodes(bigtableOptions, numberOfNodes, clusterNames, sleepDuration) - } - - /** - * Updates all clusters within the specified Bigtable instance to a specified number of nodes. - * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the - * end to lower costs yet still get high throughput during bulk ingests/dumps. - * - * @param sleepDuration - * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes - * before the new nodes are fully functional - */ - def updateNumberOfBigtableNodes( - bigtableOptions: BigtableOptions, - numberOfNodes: Int, sleepDuration: Duration ): Unit = updateNumberOfBigtableNodes( - bigtableOptions, + projectId, + instanceId, + DefaultClusterIds, numberOfNodes, - DefaultClusterNames, sleepDuration ) @@ -190,25 +98,29 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Useful for increasing the number of nodes at the beginning of a job and decreasing it at the * end to lower costs yet still get high throughput during bulk ingests/dumps. * - * @param clusterNames - * Names of clusters to be updated, all if empty + * @param numberOfNodes + * desired number of nodes for the clusters + * @param clusterIds + * clusters ids to be updated, all if empty * @param sleepDuration * How long to sleep after updating the number of nodes. Google recommends at least 20 minutes * before the new nodes are fully functional */ def updateNumberOfBigtableNodes( - bigtableOptions: BigtableOptions, + projectId: String, + instanceId: String, + clusterIds: Set[String], numberOfNodes: Int, - clusterNames: Set[String], - sleepDuration: Duration + sleepDuration: Duration = DefaultSleepDuration ): Unit = if (!self.isTest) { // No need to update the number of nodes in a test - BigtableUtil.updateNumberOfBigtableNodes( - bigtableOptions, + Admin.Instance.updateNumberOfBigtableNodes( + projectId, + instanceId, + clusterIds, numberOfNodes, - sleepDuration, - clusterNames.asJava + sleepDuration ) } @@ -220,12 +132,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def getBigtableClusterSizes(projectId: String, instanceId: String): Map[String, Int] = if (!self.isTest) { - BigtableUtil - .getClusterSizes(projectId, instanceId) - .asScala - .iterator - .map { case (k, v) => k -> v.toInt } - .toMap + Admin.Instance.getClusterSizes(projectId, instanceId) } else { Map.empty } @@ -239,62 +146,23 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * A map of tables and column families. Keys are table names. Values are a list of column family * names. */ - def ensureTables( - projectId: String, - instanceId: String, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) - } - - def ensureTables( + def ensureTable( projectId: String, instanceId: String, - tablesAndColumnFamilies: Map[String, Iterable[String]] - ): Unit = ensureTables( - projectId, - instanceId, - tablesAndColumnFamilies, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamilies - * A map of tables and column families. Keys are table names. Values are a list of column family - * names. - */ - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamilies: Iterable[String], + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.Default ): Unit = if (!self.isTest) { - TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) + Admin.Table.ensureTable(projectId, instanceId, tableId, columnFamilies, createDisposition) } - def ensureTables( - bigtableOptions: BigtableOptions, - tablesAndColumnFamilies: Map[String, Iterable[String]] - ): Unit = - ensureTables(bigtableOptions, tablesAndColumnFamilies, TableAdmin.CreateDisposition.default) - /** * Ensure that tables and column families exist. Checks for existence of tables or creates them if * they do not exist. Also checks for existence of column families within each table and creates * them if they do not exist. * - * @param tablesAndColumnFamiliesWithExpiration + * @param columnFamiliesWithExpiration * A map of tables and column families. Keys are table names. Values are a list of column family * names along with the desired cell expiration. Cell expiration is the duration before which * garbage collection of a cell may occur. Note: minimum granularity is second. @@ -302,65 +170,20 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { def ensureTablesWithExpiration( projectId: String, instanceId: String, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, - createDisposition - ) - } - - def ensureTablesWithExpiration( - projectId: String, - instanceId: String, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]] - ): Unit = ensureTablesWithExpiration( - projectId, - instanceId, - tablesAndColumnFamiliesWithExpiration, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamiliesWithExpiration - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is second. - */ - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamiliesWithExpiration: Iterable[(String, Option[Duration])], + createDisposition: Admin.Table.CreateDisposition = Admin.Table.CreateDisposition.Default ): Unit = if (!self.isTest) { - TableAdmin.ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, + Admin.Table.ensureTablesWithExpiration( + projectId, + instanceId, + tableId, + columnFamiliesWithExpiration, createDisposition ) } - def ensureTablesWithExpiration( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithExpiration: Map[String, Iterable[(String, Option[Duration])]] - ): Unit = ensureTablesWithExpiration( - bigtableOptions, - tablesAndColumnFamiliesWithExpiration, - TableAdmin.CreateDisposition.default - ) - /** * Ensure that tables and column families exist. Checks for existence of tables or creates them if * they do not exist. Also checks for existence of column families within each table and creates @@ -373,66 +196,19 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { def ensureTablesWithGcRules( projectId: String, instanceId: String, - tablesAndColumnFamiliesWithGcRules: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: TableAdmin.CreateDisposition - ): Unit = - if (!self.isTest) { - val bigtableOptions = BigtableOptions - .builder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .build - TableAdmin.ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRules, - createDisposition - ) - } - - def ensureTablesWithGcRules( - projectId: String, - instanceId: String, - tablesAndColumnFamiliesWithGcRules: Map[String, Iterable[(String, Option[GcRule])]] - ): Unit = ensureTablesWithGcRules( - projectId, - instanceId, - tablesAndColumnFamiliesWithGcRules, - TableAdmin.CreateDisposition.default - ) - - /** - * Ensure that tables and column families exist. Checks for existence of tables or creates them if - * they do not exist. Also checks for existence of column families within each table and creates - * them if they do not exist. - * - * @param tablesAndColumnFamiliesWithGcRule - * A map of tables and column families. Keys are table names. Values are a list of column family - * names along with the desired cell expiration. Cell expiration is the duration before which - * garbage collection of a cell may occur. Note: minimum granularity is second. - */ - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithGcRule: Map[String, Iterable[(String, Option[GcRule])]], - createDisposition: TableAdmin.CreateDisposition + tableId: String, + columnFamiliesWithGcRules: Iterable[(String, Option[GCRule])], + createDisposition: Admin.Table.CreateDisposition ): Unit = if (!self.isTest) { - TableAdmin.ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRule, + Admin.Table.ensureTableWithGcRules( + projectId, + instanceId, + tableId, + columnFamiliesWithGcRules, createDisposition ) } - - def ensureTablesWithGcRules( - bigtableOptions: BigtableOptions, - tablesAndColumnFamiliesWithGcRule: Map[String, Iterable[(String, Option[GcRule])]] - ): Unit = - ensureTablesWithGcRules( - bigtableOptions, - tablesAndColumnFamiliesWithGcRule, - TableAdmin.CreateDisposition.default - ) - } trait ScioContextSyntax { diff --git a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java b/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java deleted file mode 100644 index 8bad910623..0000000000 --- a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableBulkWriterTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright 2018 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.bigtable; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - -import com.google.bigtable.v2.Mutation; -import com.google.protobuf.ByteString; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class BigtableBulkWriterTest { - - @Rule public final transient TestPipeline p = TestPipeline.create(); - - private static final Instant baseTime = new Instant(0); - - private static final TypeDescriptor>> BIGTABLE_WRITE_TYPE = - new TypeDescriptor>>() {}; - - @Test - public void testCreateBulkShards() throws Exception { - final List>> expected = new ArrayList<>(); - - final String key1 = "key1"; - final String value1 = "value1"; - final String key2 = "key2"; - final String value2 = "value2"; - final String key3 = "key3"; - final String value3 = "value3"; - final String key4 = "key4"; - final String value4 = "value4"; - - final TimestampedValue>> firstMutation = - makeMutation(key1, value1, Duration.standardMinutes(1)); - - expected.add(firstMutation.getValue()); - - final TimestampedValue>> secondMutation = - makeMutation(key2, value2, Duration.standardMinutes(5)); - expected.add(secondMutation.getValue()); - - final TimestampedValue>> thirdMutation = - makeMutation(key3, value3, Duration.standardMinutes(1)); - expected.add(thirdMutation.getValue()); - - final TimestampedValue>> fourthMutation = - makeMutation(key4, value4, Duration.standardMinutes(4)); - expected.add(fourthMutation.getValue()); - - final Coder>> bigtableCoder = - p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE); - - final TestStream>> kvTestStream = - TestStream.create(bigtableCoder) - .addElements(firstMutation) - .advanceProcessingTime(Duration.standardMinutes(2)) - .addElements(secondMutation) - .advanceProcessingTime(Duration.standardMinutes(11)) - .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(15))) - // Late elements are always observable within the global window - they arrive before - // the window closes, so they will appear in a pane, even if they arrive after the - // allowed lateness, and are taken into account alongside on-time elements - .addElements(thirdMutation) - .addElements(fourthMutation) - .advanceProcessingTime(Duration.standardMinutes(20)) - .advanceWatermarkToInfinity(); - - final Duration flushInterval = Duration.standardSeconds(1); - final int numOfShard = 1; - final PCollection>>> actual = - p.apply(kvTestStream).apply(new TestPTransform(numOfShard, flushInterval)); - - PAssert.that(actual).inEarlyGlobalWindowPanes().satisfies(new VerifyKVStreamFn(expected)); - - p.run(); - } - - private TimestampedValue>> makeMutation( - String key, String value, Duration baseTimeOffset) { - Instant timestamp = baseTime.plus(baseTimeOffset); - ByteString rowKey = ByteString.copyFromUtf8(key); - Iterable mutations = - Collections.singletonList( - Mutation.newBuilder() - .setSetCell(Mutation.SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value))) - .build()); - return TimestampedValue.of(KV.of(rowKey, mutations), timestamp); - } - - /** - * Hepler class to verify output of {@link PCollection} by converting {@link ByteString} to {@link - * String} to able to verify values. - */ - private static class VerifyKVStreamFn - implements SerializableFunction< - Iterable>>>, Void> { - - private final Iterable>> expected; - - private VerifyKVStreamFn(Iterable>> expected) { - this.expected = expected; - } - - @Override - public Void apply(Iterable>>> input) { - verify(input, expected); - return null; - } - - private List>> convertExpected( - final Iterable>> input) { - List>> mutations = new ArrayList<>(); - for (KV> kv : input) { - final String key = kv.getKey().toString(StandardCharsets.UTF_8); - mutations.add(KV.of(key, kv.getValue())); - } - return mutations; - } - - private List>> convertActual( - final Iterable>>> input) { - List>> mutations = new ArrayList<>(); - for (Iterable>> kv : input) { - for (KV> value : kv) { - final String key = value.getKey().toString(StandardCharsets.UTF_8); - mutations.add(KV.of(key, value.getValue())); - } - } - return mutations; - } - - private void verify( - final Iterable>>> input, - final Iterable>> expected) { - final List>> actual = convertActual(input); - final List>> expectedValues = convertExpected(expected); - - final KV[] kvs = expectedValues.toArray(new KV[0]); - - assertThat(actual, containsInAnyOrder(kvs)); - } - } - - /** Hepler to test createBulkShards. */ - private static class TestPTransform - extends PTransform< - PCollection>>, - PCollection>>>> { - - private final int numOfShards; - private final Duration flushInterval; - - private TestPTransform(int numOfShards, Duration flushInterval) { - this.numOfShards = numOfShards; - this.flushInterval = flushInterval; - } - - @Override - public PCollection>>> expand( - PCollection>> input) { - return BigtableBulkWriter.createBulkShards(input, numOfShards, flushInterval); - } - } -} diff --git a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java b/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java deleted file mode 100644 index f47ece0330..0000000000 --- a/scio-google-cloud-platform/src/test/java/com/spotify/scio/bigtable/BigtableUtilTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2022 Spotify AB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.spotify.scio.bigtable; - -import org.junit.Assert; -import org.junit.Test; - -public class BigtableUtilTest { - - @Test - public void shorterNameTest() { - Assert.assertEquals( - BigtableUtil.shorterName( - "/projects/scio-test/instances/test-instance/clusters/sample-cluster"), - "sample-cluster"); - - Assert.assertEquals(BigtableUtil.shorterName("simple-name-cluster"), "simple-name-cluster"); - } -} diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala index fd579b42b9..c753f65327 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigTableBatchDoFnTest.scala @@ -17,8 +17,9 @@ package com.spotify.scio.bigtable +import com.google.cloud.bigtable.data.v2.BigtableDataClient + import java.util.concurrent.{CompletionException, ConcurrentLinkedQueue} -import com.google.cloud.bigtable.grpc.BigtableSession import com.google.common.cache.{Cache, CacheBuilder} import com.google.common.util.concurrent.{Futures, ListenableFuture} import com.spotify.scio.testing._ @@ -82,9 +83,9 @@ class TestBigtableBatchDoFn BigtableBatchDoFnTest.batchResponse, BigtableBatchDoFnTest.idExtractor ) { - override def newClient(): BigtableSession = null + override def newClient(): BigtableDataClient = null override def asyncLookup( - session: BigtableSession, + client: BigtableDataClient, input: List[Int] ): ListenableFuture[List[String]] = Futures.immediateFuture(input.map(_.toString)) @@ -100,10 +101,10 @@ class TestCachingBigtableBatchDoFn 100, new TestCacheBatchSupplier ) { - override def newClient(): BigtableSession = null + override def newClient(): BigtableDataClient = null override def asyncLookup( - session: BigtableSession, + client: BigtableDataClient, input: List[Int] ): ListenableFuture[List[String]] = { input.foreach(BigtableBatchDoFnTest.queue.add) @@ -119,9 +120,9 @@ class TestFailingBigtableBatchDoFn BigtableBatchDoFnTest.batchResponse, BigtableBatchDoFnTest.idExtractor ) { - override def newClient(): BigtableSession = null + override def newClient(): BigtableDataClient = null override def asyncLookup( - session: BigtableSession, + client: BigtableDataClient, input: List[Int] ): ListenableFuture[List[String]] = if (input.size % 2 == 0) { diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala index f6d991cbaf..b7c064818e 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigtable/BigtableDoFnTest.scala @@ -17,17 +17,15 @@ package com.spotify.scio.bigtable -import com.google.cloud.bigtable.config.BigtableOptions - -import java.util.concurrent.ConcurrentLinkedQueue -import com.google.cloud.bigtable.grpc.BigtableSession +import com.google.api.core.{ApiFuture, ApiFutures} +import com.google.cloud.bigtable.data.v2.BigtableDataClient import com.google.common.cache.{Cache, CacheBuilder} -import com.google.common.util.concurrent.{Futures, ListenableFuture} import com.spotify.scio.testing._ import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier import com.spotify.scio.transforms.JavaAsyncConverters._ import com.spotify.scio.util.TransformingCache.SimpleTransformingCache +import java.util.concurrent.ConcurrentLinkedQueue import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success} @@ -40,6 +38,7 @@ class BigtableDoFnTest extends PipelineSpec { } it should "work with cache" in { + BigtableDoFnTest.queue.clear() val fn = new TestCachingBigtableDoFn val output = runWithData((1 to 10) ++ (6 to 15))(_.parDo(fn)) .map(kv => (kv.getKey, kv.getValue.get())) @@ -68,32 +67,27 @@ object BigtableDoFnTest { val queue: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]() } -class TestBigtableDoFn extends BigtableDoFn[Int, String](BigtableOptions.getDefaultOptions) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = - Futures.immediateFuture(input.toString) +class TestBigtableDoFn extends BigtableDoFn[Int, String](null) { + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = + ApiFutures.immediateFuture(input.toString) } -class TestCachingBigtableDoFn - extends BigtableDoFn[Int, String]( - BigtableOptions.getDefaultOptions, - 100, - new TestCacheSupplier - ) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = { +class TestCachingBigtableDoFn extends BigtableDoFn[Int, String](null, 100, new TestCacheSupplier) { + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = { BigtableDoFnTest.queue.add(input) - Futures.immediateFuture(input.toString) + ApiFutures.immediateFuture(input.toString) } } -class TestFailingBigtableDoFn extends BigtableDoFn[Int, String](BigtableOptions.getDefaultOptions) { - override def newClient(): BigtableSession = null - override def asyncLookup(session: BigtableSession, input: Int): ListenableFuture[String] = +class TestFailingBigtableDoFn extends BigtableDoFn[Int, String](null) { + override def newClient(): BigtableDataClient = null + override def asyncLookup(client: BigtableDataClient, input: Int): ApiFuture[String] = if (input % 2 == 0) { - Futures.immediateFuture("success" + input) + ApiFutures.immediateFuture("success" + input) } else { - Futures.immediateFailedFuture(new RuntimeException("failure" + input)) + ApiFutures.immediateFailedFuture(new RuntimeException("failure" + input)) } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala index ddf4c41360..0184b0a4ed 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/coders/instance/kryo/GcpSerializerTest.scala @@ -19,10 +19,9 @@ package com.spotify.scio.coders.instance.kryo import com.google.api.gax.grpc.GrpcStatusCode import com.google.api.gax.rpc.InternalException import com.google.cloud.bigtable.data.v2.models.MutateRowsException -import com.google.cloud.bigtable.grpc.scanner.BigtableRetriesExhaustedException import com.spotify.scio.coders.instances.kryo.GrpcSerializerTest._ import io.grpc.Status.Code -import io.grpc.{Metadata, Status, StatusRuntimeException} +import io.grpc.{Status, StatusRuntimeException} import org.scalactic.Equality import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -31,13 +30,6 @@ import scala.jdk.CollectionConverters._ object GcpSerializerTest { - implicit val eqBigtableRetriesExhaustedException: Equality[BigtableRetriesExhaustedException] = { - case (a: BigtableRetriesExhaustedException, b: BigtableRetriesExhaustedException) => - a.getMessage == b.getMessage && - eqCause.areEqual(a.getCause, b.getCause) - case _ => false - } - implicit val eqMutateRowsException: Equality[MutateRowsException] = { case (a: MutateRowsException, b: MutateRowsException) => eqCause.areEqual(a.getCause, b.getCause) && @@ -59,16 +51,6 @@ class GcpSerializerTest extends AnyFlatSpec with Matchers { import GcpSerializerTest._ import com.spotify.scio.testing.CoderAssertions._ - "BigtableRetriesExhaustedException" should "roundtrip" in { - val metadata = new Metadata() - metadata.put(Metadata.Key.of[String]("k", Metadata.ASCII_STRING_MARSHALLER), "v") - val cause = new StatusRuntimeException( - Status.OK.withCause(new RuntimeException("bar")).withDescription("bar"), - metadata - ) - new BigtableRetriesExhaustedException("Error", cause) coderShould roundtrip() - } - "MutateRowsExceptionSerializer" should "roundtrip" in { val cause = new StatusRuntimeException(Status.OK) val code = GrpcStatusCode.of(Status.OK.getCode) diff --git a/site/src/main/paradox/FAQ.md b/site/src/main/paradox/FAQ.md index 2fe9536556..4b791ee56f 100644 --- a/site/src/main/paradox/FAQ.md +++ b/site/src/main/paradox/FAQ.md @@ -365,18 +365,11 @@ Datastore `Entity` class is actually generated from @github[Protobuf](/scio-exam #### How do I throttle Bigtable writes? -Currently, Dataflow autoscaling may not work well with large writes BigtableIO. Specifically It does not take into account Bigtable IO rate limits and may scale up more workers and end up hitting the limit and eventually fail the job. As a workaround, you can enable throttling for Bigtable writes in Scio 0.4.0-alpha2 or later. +To prevent overloading the cluster but keep trigger the autoscaling if available, you can enable flow control for Bigtable writes in Scio 0.15.0 or later. -```scala mdoc:reset:invisible -val btProjectId = "" -val btInstanceId = "" -val btTableId = "" -``` - -```scala mdoc:silent +```scala mdoc:reset:silent import com.spotify.scio.values._ import com.spotify.scio.bigtable._ -import com.google.cloud.bigtable.config.{BigtableOptions, BulkOptions} import com.google.bigtable.v2.Mutation import com.google.protobuf.ByteString @@ -385,16 +378,10 @@ def main(cmdlineArgs: Array[String]): Unit = { val data: SCollection[(ByteString, Iterable[Mutation])] = ??? - val btOptions = - BigtableOptions.builder() - .setProjectId(btProjectId) - .setInstanceId(btInstanceId) - .setBulkOptions(BulkOptions.builder() - .enableBulkMutationThrottling() - .setBulkMutationRpcTargetMs(10) // lower latency threshold, default is 100 - .build()) - .build() - data.saveAsBigtable(btOptions, btTableId) + val btProjectId: String = ??? + val btInstanceId: String = ??? + val btTableId: String = ??? + data.saveAsBigtable(btProjectId, btInstanceId, btTableId, flowControl = true) // ... }