Skip to content

Commit

Permalink
Support BQ Json arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Jan 17, 2025
1 parent c8ca2f4 commit 9037aa6
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ object TypedBigQueryIT {
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
isArray <- Arbitrary.arbBool.arbitrary
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
} yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ object TableRowOps {
}

def json(value: AnyRef): Json = value match {
case x: Json => x
case x: TableRow => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
case x: Json => x
case x: java.util.Map[_, _] => Json(x)
case x: java.util.List[_] => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
}

def bignumeric(value: AnyRef): BigNumeric = value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,17 @@ package object types {
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row))
def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow])
def apply(value: AnyRef): Json = Json(mapper.writeValueAsString(value))
def parse(json: Json): AnyRef = {
val node = mapper.readTree(json.wkt)
if (node.isObject) {
mapper.treeToValue(node, classOf[java.util.Map[_, _]])
} else if (node.isArray) {
mapper.treeToValue(node, classOf[java.util.List[_]])
} else {
throw new IllegalArgumentException(s"Invalid json ${json.wkt}")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ final class ConverterProviderSpec
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
isArray <- Arbitrary.arbBool.arbitrary
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
} yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec

import scala.jdk.CollectionConverters._

class ConverterProviderTest extends AnyFlatSpec with Matchers {
import ConverterProviderTest._

Expand All @@ -49,13 +51,42 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
}

it should "handle required json type" in {
val wkt = """{"name":"Alice","age":30}"""
val parsed = new TableRow()
.set("name", "Alice")
.set("age", 30)
// JSON object
{
val wkt = """{"name":"Alice","age":30,"address":{"street":"Broadway","city":"New York"}}"""
val parsed = Map(
"name" -> "Alice",
"age" -> 30,
"address" -> Map(
"street" -> "Broadway",
"city" -> "New York"
).asJava
).asJava

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow(
"a" -> parsed
)
}

// JSON array
{
val wkt = """["Alice",30,{"street":"Broadway","city":"New York"}]"""
val parsed = List(
"Alice",
30,
Map(
"street" -> "Broadway",
"city" -> "New York"
).asJava
).asJava

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow(
"a" -> parsed
)
}

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed)
}

it should "handle required big numeric type" in {
Expand Down

0 comments on commit 9037aa6

Please sign in to comment.