Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline parquet-avro with avro API #5227

Draft
wants to merge 5 commits into
base: v0.15.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,10 @@ lazy val `scio-avro` = project
.settings(macroSettings)
.settings(
description := "Scio add-on for working with Avro",
unusedCompileDependenciesFilter -= Seq(
// transitively require for parquet-avro projection macros
moduleFilter("org.apache.avro", "avro-compiler")
).reduce(_ | _),
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
Expand All @@ -746,6 +750,7 @@ lazy val `scio-avro` = project
"com.twitter" % "chill-java" % chillVersion,
"me.lyh" %% "protobuf-generic" % protobufGenericVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.apache.avro" % "avro-compiler" % avroVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
Expand Down Expand Up @@ -1035,8 +1040,8 @@ lazy val `scio-parquet` = project
.in(file("scio-parquet"))
.dependsOn(
`scio-core`,
`scio-avro` % "provided",
`scio-tensorflow` % "provided",
`scio-avro` % Test,
`scio-test` % "test->test"
)
.settings(commonSettings)
Expand All @@ -1050,7 +1055,7 @@ lazy val `scio-parquet` = project
javacOptions ++= Seq("-s", (sourceManaged.value / "main").toString),
description := "Scio add-on for Parquet",
unusedCompileDependenciesFilter -= Seq(
// required by me.lyh:parquet-avro
// required by me.lyh:parquet-avro macros
moduleFilter("org.apache.avro", "avro-compiler"),
// replacing log4j compile time dependency
moduleFilter("org.slf4j", "log4j-over-slf4j")
Expand All @@ -1061,10 +1066,7 @@ lazy val `scio-parquet` = project
"com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion",
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"com.spotify" %% "magnolify-parquet" % magnolifyVersion,
"com.twitter" %% "chill" % chillVersion,
"me.lyh" %% "parquet-avro" % parquetExtraVersion,
"org.apache.avro" % "avro" % avroVersion,
"org.apache.avro" % "avro-compiler" % avroVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-hadoop-common" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-hadoop-format" % beamVersion,
Expand All @@ -1078,6 +1080,9 @@ lazy val `scio-parquet` = project
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion, // log4j is excluded from hadoop
"org.slf4j" % "slf4j-api" % slf4jVersion,
// provided
"org.apache.avro" % "avro" % avroVersion % Provided,
"org.apache.avro" % "avro-compiler" % avroVersion % Provided,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion % Provided,
"org.tensorflow" % "tensorflow-core-api" % tensorFlowVersion % Provided,
// runtime
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % Runtime excludeAll (Exclude.metricsCore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.util.{Failure, Success}
private[coders] object VoidCoder extends AtomicCoder[Void] {
override def encode(value: Void, outStream: OutputStream): Unit = ()

override def decode(inStream: InputStream): Void = ???
override def decode(inStream: InputStream): Void = null

override def structuralValue(value: Void): AnyRef = AnyRef
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ object ParquetExample {
* These case classes represent both full and projected field mappings from the [[Account]] Avro
* record.
*/
case class AccountFull(id: Int, `type`: String, name: Option[String], amount: Double)
case class AccountFull(
id: Int,
`type`: String,
name: Option[String],
amount: Double,
accountStatus: Option[AccountStatus]
)
case class AccountProjection(id: Int, name: Option[String])

/**
Expand Down Expand Up @@ -108,21 +114,19 @@ object ParquetExample {

private def avroSpecificIn(sc: ScioContext, args: Args): ClosedTap[String] = {
// Macros for generating column projections and row predicates
val projection = Projection[Account](_.getId, _.getName, _.getAmount)
// account_status is the only field with default value that can be left out the projection
val projection = Projection[Account](_.getId, _.getType, _.getName, _.getAmount)
val predicate = Predicate[Account](x => x.getAmount > 0)

sc.parquetAvroFile[Account](args("input"), projection, predicate)
// The result Account records are not complete Avro objects. Only the projected columns are present while the rest are null.
// These objects may fail serialization and it’s recommended that you map them out to tuples or case classes right after reading.
.map(x => AccountProjection(x.getId, Some(x.getName.toString)))
.saveAsTextFile(args("output"))
}

private def avroGenericIn(sc: ScioContext, args: Args): ClosedTap[String] = {
val schema = Account.getClassSchema
implicit val genericRecordCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)

val parquetIn = sc.parquetAvroFile[GenericRecord](args("input"), schema)
val parquetIn = sc.parquetAvroGenericRecordFile(args("input"), schema)

// Catches a specific bug with encoding GenericRecords read by parquet-avro
parquetIn
Expand All @@ -146,12 +150,19 @@ object ParquetExample {
// but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks.
.saveAsParquetAvroFile(args("output"), numShards = 1, conf = fineTunedParquetWriterConfig)

private[extra] def toScalaFull(account: Account): AccountFull =
AccountFull(
account.getId,
account.getType.toString,
Some(account.getName.toString),
account.getAmount,
Some(account.getAccountStatus)
)

private def typedOut(sc: ScioContext, args: Args): ClosedTap[AccountFull] =
sc.parallelize(fakeData)
.map(x => AccountFull(x.getId, x.getType.toString, Some(x.getName.toString), x.getAmount))
.saveAsTypedParquetFile(
args("output")
)
.map(toScalaFull)
.saveAsTypedParquetFile(args("output"))

private[extra] def toExample(account: Account): Example = {
val amount = Feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ParquetExampleTest extends PipelineSpec {

"ParquetExample" should "work for SpecificRecord input" in {
val expected = ParquetExample.fakeData
.map(x => AccountProjection(x.getId, Some(x.getName.toString)))
// set default value on field outside projection
.map(x => Account.newBuilder(x).setAccountStatus(null).build())
.map(_.toString)

JobTest[com.spotify.scio.examples.extra.ParquetExample.type]
Expand Down Expand Up @@ -79,8 +80,7 @@ class ParquetExampleTest extends PipelineSpec {
}

it should "work for typed output" in {
val expected = ParquetExample.fakeData
.map(a => AccountFull(a.getId, a.getType.toString, Some(a.getName.toString), a.getAmount))
val expected = ParquetExample.fakeData.map(ParquetExample.toScalaFull)

JobTest[com.spotify.scio.examples.extra.ParquetExample.type]
.args("--output=out.parquet", "--method=typedOut")
Expand Down
Loading