Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.apache.beam.sdk.PipelineResult.State
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.metrics.Counter
import org.apache.beam.sdk.metrics.{
Counter,
Lineage,
MetricNameFilter,
MetricQueryResults,
MetricsFilter,
MetricsOptions,
MetricsSink
}
import org.apache.beam.sdk.options._
import org.apache.beam.sdk.transforms._
import org.apache.beam.sdk.values._
Expand Down Expand Up @@ -247,6 +255,39 @@ object ContextAndArgs {
}
}

class CustomMetricsSink extends MetricsSink {
private[scio] val log = LoggerFactory.getLogger(this.getClass)

override def writeMetrics(metricQueryResults: MetricQueryResults): Unit = {
log.warn("!!!!!!! WRITE METRIC stringSets = " + queryLineageStringSets(metricQueryResults))
log.warn("!!!!!!! WRITE METRIC boundedTries = " + queryLineageBoundedTries(metricQueryResults))
}

private def queryLineageStringSets(results: MetricQueryResults): Set[String] = {
results.getStringSets.asScala.flatMap { metric =>
Try(metric.getCommitted.getStringSet.asScala.toSet)
.orElse(Try(metric.getAttempted.getStringSet.asScala.toSet))
.getOrElse(Set.empty[String])
}.toSet
}

private def queryLineageBoundedTries(results: MetricQueryResults): Set[String] = {
results.getBoundedTries.asScala.flatMap { metric =>
val processResult = (result: java.util.Set[java.util.List[String]]) =>
result.asScala.map { fqn =>
val segments = fqn.asScala.toList
val truncated = if (segments.nonEmpty && segments.last.toBoolean) "*" else ""
segments.dropRight(1).mkString + truncated
}.toSet

Try(processResult(metric.getCommitted.getResult))
.orElse(Try(processResult(metric.getAttempted.getResult)))
.getOrElse(Set.empty[String])
}.toSet
}

}

/**
* ScioExecutionContext is the result of [[ScioContext#run()]].
*
Expand Down Expand Up @@ -378,6 +419,11 @@ object ScioContext {
.as(classOf[ScioOptions])
.setAppArguments(sanitizedArgString)
}

val metricsOptions = pipelineOpts.as(classOf[MetricsOptions])
metricsOptions.setMetricsSink(classOf[CustomMetricsSink])
metricsOptions.setMetricsPushPeriod(5)

(pipelineOpts, args)
}
}
Expand Down
23 changes: 17 additions & 6 deletions scio-core/src/main/scala/com/spotify/scio/ScioResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.fs.{ResolveOptions, ResourceId}
import org.apache.beam.sdk.metrics.{DistributionResult, GaugeResult, Lineage}
import org.apache.beam.sdk.util.MimeTypes
import org.apache.beam.sdk.{metrics => beam, PipelineResult}

import org.joda.time.Instant
import org.slf4j.LoggerFactory

import java.io.File
import java.nio.ByteBuffer
Expand All @@ -46,6 +46,7 @@ trait RunnerResult {

/** Represent a Scio pipeline result. */
abstract class ScioResult private[scio] (val internal: PipelineResult) {
private val logger = LoggerFactory.getLogger(getClass)

/** Get a Beam runner specific result. */
def as[T <: RunnerResult: ClassTag]: T = {
Expand Down Expand Up @@ -104,16 +105,26 @@ abstract class ScioResult private[scio] (val internal: PipelineResult) {

private def saveJsonFile(resourceId: ResourceId, value: Object): Unit = {
val mapper = ScioUtil.getScalaJsonMapper
val out = FileSystems.create(resourceId, MimeTypes.TEXT)
try {
out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value)))
} finally {
if (out != null) {
out.close()
val out = FileSystems.create(resourceId, MimeTypes.TEXT)
try {
out.write(ByteBuffer.wrap(mapper.writeValueAsBytes(value)))
} finally {
if (out != null) {
out.close()
}
}
logger.info(f"Saved metrics to '$resourceId'")
} catch {
case e: Throwable =>
logger.warn(
f"Failed to save metrics to '$resourceId': ${mapper.writeValueAsString(value)}",
e
)
}
}

/** Get lineage metric values. */
def getBeamLineage: BeamLineage = {
def asScalaCrossCompatible(set: java.util.Set[String]): Iterable[String] = {
val iterator = set.iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.spotify.scio.examples

import com.spotify.scio.util.ScioUtil
import com.spotify.scio.{ContextAndArgs, ScioContext}
import com.spotify.scio.{ContextAndArgs, ScioContext, ScioResult}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
Expand All @@ -33,7 +33,8 @@ object RunPreReleaseIT {
"--runner=DataflowRunner",
"--project=data-integration-test",
"--region=us-central1",
"--tempLocation=gs://dataflow-tmp-us-central1/gha"
"--tempLocation=gs://dataflow-tmp-us-central1/gha",
"--dataflowServiceOptions=enable_lineage=true"
)

private val log = LoggerFactory.getLogger(getClass)
Expand All @@ -51,6 +52,33 @@ object RunPreReleaseIT {
}
}

private def assertLineageHasGcsItem(metrics: List[String], prefix: String, path: String): Unit = {
if (!metrics.exists(x => x.startsWith(prefix) && x.contains(path))) {
throw new Throwable(f"Expect to have '$prefix...$path...' in $metrics");
}
}

private def assertLineageIsPopulated(
scioResult: ScioResult
)(
prefix: String = "gcs:",
assertInput: String = null,
assertInput2: String = null,
assertOutput: String = null
): Unit = {
if (assertInput != null) {
assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput)
}

if (assertInput2 != null) {
assertLineageHasGcsItem(scioResult.getBeamLineage.sources, prefix, assertInput2)
}

if (assertOutput != null) {
assertLineageHasGcsItem(scioResult.getBeamLineage.sinks, prefix, assertOutput)
}
}

private def avro(runId: String): Future[Unit] = {
import com.spotify.scio.examples.extra.AvroExample

Expand All @@ -59,10 +87,12 @@ object RunPreReleaseIT {

log.info("Starting Avro write tests... ")
val write = invokeJob[AvroExample.type]("--method=specificOut", s"--output=$out1")
write.flatMap { _ =>
log.info("Starting Avro read tests... ")
invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2")
}
write
.flatMap { _ =>
log.info("Starting Avro read tests... ")
invokeJob[AvroExample.type]("--method=specificIn", s"--input=$out1/*", s"--output=$out2")
}
.map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2))
}

private def parquet(runId: String): Future[Unit] = {
Expand All @@ -77,34 +107,37 @@ object RunPreReleaseIT {

log.info("Starting Parquet write tests... ")
val writes = List(
invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1"),
invokeJob[ParquetExample.type]("--method=avroOut", s"--output=$out1")
.map(assertLineageIsPopulated(_)(assertOutput = out2)),
invokeJob[ParquetExample.type]("--method=exampleOut", s"--output=$out5")
.map(assertLineageIsPopulated(_)(assertOutput = out5))
)

Future.sequence(writes).flatMap { _ =>
log.info("Starting Parquet read tests... ")
val reads = List(
invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2"),
invokeJob[ParquetExample.type]("--method=typedIn", s"--input=$out1/*", s"--output=$out2")
.map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out2)),
invokeJob[ParquetExample.type](
"--method=avroSpecificIn",
s"--input=$out1/*",
s"--output=$out3"
),
).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out3)),
invokeJob[ParquetExample.type](
"--method=avroGenericIn",
s"--input=$out1/*",
s"--output=$out4"
),
).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)),
invokeJob[ParquetExample.type](
"--method=avroGenericIn",
s"--input=$out1/*",
s"--output=$out4"
),
).map(assertLineageIsPopulated(_)(assertInput = out1, assertOutput = out4)),
invokeJob[ParquetExample.type](
"--method=exampleIn",
s"--input=$out5/*",
s"--output=$out6"
)
).map(assertLineageIsPopulated(_)(assertInput = out5, assertOutput = out6))
)
Future.sequence(reads).map(_ => ())
}
Expand All @@ -118,21 +151,41 @@ object RunPreReleaseIT {
}
val out1 = gcsPath[SortMergeBucketWriteExample.type]("users", runId)
val out2 = gcsPath[SortMergeBucketWriteExample.type]("accounts", runId)
val joinPath = gcsPath[SortMergeBucketWriteExample.type]("join", runId)
val transformPath = gcsPath[SortMergeBucketWriteExample.type]("transform", runId)

log.info("Starting SMB write tests... ")
val write = invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2")
val write =
invokeJob[SortMergeBucketWriteExample.type](s"--users=$out1", s"--accounts=$out2").map(
assertLineageIsPopulated(_)(
assertInput = out1,
assertOutput = out2
)
)
write.flatMap { _ =>
log.info("Starting SMB read tests... ")
val readJobs = List(
invokeJob[SortMergeBucketJoinExample.type](
s"--users=$out1",
s"--accounts=$out2",
s"--output=${gcsPath[SortMergeBucketJoinExample.type]("join", runId)}"
s"--output=$joinPath"
).map(
assertLineageIsPopulated(_)(
assertInput = out1,
assertInput2 = out2,
assertOutput = joinPath
)
),
invokeJob[SortMergeBucketTransformExample.type](
s"--users=$out1",
s"--accounts=$out2",
s"--output=${gcsPath[SortMergeBucketTransformExample.type]("transform", runId)}"
s"--output=$transformPath"
).map(
assertLineageIsPopulated(_)(
assertInput = out1,
assertInput2 = out2,
assertOutput = transformPath
)
)
)
Future.sequence(readJobs).map(_ => ())
Expand All @@ -145,15 +198,25 @@ object RunPreReleaseIT {
val jobs = List(
invokeJob[TypedStorageBigQueryTornadoes.type](
s"--output=data-integration-test:gha_it_us.typed_storage"
).map(
assertLineageIsPopulated(_)(
prefix = "bigquery:",
assertOutput = "data-integration-test.gha_it_us.typed_storage"
)
),
invokeJob[TypedBigQueryTornadoes.type](
s"--output=data-integration-test:gha_it_us.typed_row"
).map(
assertLineageIsPopulated(_)(
prefix = "bigquery:",
assertOutput = "data-integration-test.gha_it_us.typed_row"
)
)
)
Future.sequence(jobs).map(_ => ())
}

private def invokeJob[T: ClassTag](args: String*): Future[Unit] = {
private def invokeJob[T: ClassTag](args: String*): Future[ScioResult] = {
val cls = ScioUtil.classOf[T]
val jobObjName = cls.getName.replaceAll("\\$$", "")
val pipelines = Class
Expand Down
Loading