-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added uploadTypedRows to BigQuery client #5218
base: main
Are you sure you want to change the base?
Changes from 5 commits
9f91e0d
9960886
17dcd89
9d4e9b6
a2dbae0
08e180c
cb0cb05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,15 +18,22 @@ | |||||
package com.spotify.scio.bigquery.client | ||||||
|
||||||
import com.google.api.services.bigquery.model._ | ||||||
import com.google.cloud.storage.{BlobId, BlobInfo, Storage} | ||||||
import com.spotify.scio.bigquery.client.BigQuery.Client | ||||||
import com.spotify.scio.bigquery.{BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} | ||||||
import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation | ||||||
import com.spotify.scio.bigquery.{BigQueryType, BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} | ||||||
import org.apache.avro.file.DataFileWriter | ||||||
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} | ||||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} | ||||||
import org.apache.beam.sdk.io.gcp.{bigquery => bq} | ||||||
import org.apache.commons.lang3.RandomStringUtils | ||||||
import org.slf4j.LoggerFactory | ||||||
|
||||||
import java.io.{ByteArrayInputStream, ByteArrayOutputStream} | ||||||
import scala.annotation.nowarn | ||||||
import scala.jdk.CollectionConverters._ | ||||||
import scala.util.Try | ||||||
import scala.reflect.runtime.universe.TypeTag | ||||||
|
||||||
private[client] object LoadOps { | ||||||
private val Logger = LoggerFactory.getLogger(this.getClass) | ||||||
|
@@ -119,6 +126,56 @@ final private[client] class LoadOps(client: Client, jobService: JobOps) { | |||||
location = location | ||||||
) | ||||||
|
||||||
/** | ||||||
* Upload List of rows to Cloud Storage as Avro file and load to BigQuery table. Note that element | ||||||
* type `T` must be annotated with [[BigQueryType]]. | ||||||
*/ | ||||||
def uploadTypedRows[T <: HasAnnotation: TypeTag]( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this naming can be simplified. Other APIs here do not have the I think this should be named
Suggested change
|
||||||
tableSpec: String, | ||||||
rows: List[T], | ||||||
tempLocation: String, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is temp, shouldn't we clean if afterward ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about bucket retention policy, but yeah, deleting it would be more optimal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on the other hand if I do:
I am not confident that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm wondering also if we should create a |
||||||
writeDisposition: WriteDisposition = WriteDisposition.WRITE_APPEND, | ||||||
createDisposition: CreateDisposition = CreateDisposition.CREATE_IF_NEEDED | ||||||
): Try[TableReference] = { | ||||||
val bqt = BigQueryType[T] | ||||||
|
||||||
Try { | ||||||
val out = new ByteArrayOutputStream() | ||||||
val datumWriter = new GenericDatumWriter[GenericRecord]() | ||||||
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) | ||||||
try { | ||||||
dataFileWriter.create(bqt.avroSchema, out) | ||||||
rows.foreach { row => | ||||||
dataFileWriter.append(bqt.toAvro(row)) | ||||||
} | ||||||
} finally { | ||||||
dataFileWriter.close() | ||||||
} | ||||||
|
||||||
val blobId = | ||||||
BlobId.fromGsUtilUri( | ||||||
s"${tempLocation.stripSuffix("/")}/upload_${RandomStringUtils.randomAlphanumeric(10)}.avro" | ||||||
) | ||||||
val blobInfo = BlobInfo.newBuilder(blobId).build | ||||||
client.blobStorage.createFrom( | ||||||
blobInfo, | ||||||
new ByteArrayInputStream(out.toByteArray), | ||||||
Storage.BlobWriteOption.doesNotExist(), | ||||||
Storage.BlobWriteOption.crc32cMatch() | ||||||
clairemcginty marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) | ||||||
|
||||||
blobId | ||||||
}.flatMap { blobId => | ||||||
avro( | ||||||
List(blobId.toGsUtilUri), | ||||||
tableSpec, | ||||||
schema = Some(bqt.schema), | ||||||
createDisposition = createDisposition, | ||||||
writeDisposition = writeDisposition | ||||||
) | ||||||
} | ||||||
} | ||||||
|
||||||
@nowarn("msg=private default argument in class LoadOps is never used") | ||||||
private def execute( | ||||||
sources: List[String], | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in sync with Beam
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your link points to GCP libraries-bom 26.22.0 but beam 2.53 uses 26.28.0 here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, reverting this