From a9037d1f2fd615c06ba3ef7bee693739e43c9b27 Mon Sep 17 00:00:00 2001 From: "balazs.nemeth@doctusoft.com" Date: Tue, 15 Nov 2022 09:52:55 +0100 Subject: [PATCH 1/3] #24167 Document<->Row conversion for FirestoreIO --- .../sdk/io/gcp/firestore/DocumentToRow.java | 163 ++++++++++++ .../sdk/io/gcp/firestore/FirestoreHelper.java | 248 ++++++++++++++++++ .../sdk/io/gcp/firestore/RowToDocument.java | 176 +++++++++++++ 3 files changed, 587 insertions(+) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java new file mode 100644 index 000000000000..2d0c9d8cc2d2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.firestore; + +import static org.apache.beam.sdk.io.gcp.firestore.FirestoreHelper.makeReferenceValue; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.MapValue; +import com.google.firestore.v1.Value; +import com.google.protobuf.util.Timestamps; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@code PTransform} to perform a conversion of {@link Document} to {@link Row}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class DocumentToRow extends PTransform, PCollection> { + private final Schema schema; + private final String keyField; + private static final Logger LOG = LoggerFactory.getLogger(DocumentToRow.class); + + private DocumentToRow(Schema schema, String keyField) { + this.schema = schema; + this.keyField = keyField; + + if (schema.getFieldNames().contains(keyField)) { + if (!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.STRING)) { + throw new IllegalStateException( + "Field `" + + keyField + + "` should of type `STRING`. Please change the type or specify a field to" + + " store the KEY value."); + } + LOG.info("Document KEY will be stored under `" + keyField + "` field."); + } + } + + /** + * Create a PTransform instance. + * + * @param schema {@code Schema} of the target row. + * @param keyField A name of the row field to store the {@code Key} in. + * @return {@code PTransform} instance for Document to Row conversion. + */ + public static DocumentToRow create(Schema schema, String keyField) { + return new DocumentToRow(schema, keyField); + } + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new DocumentToRow.DocumentToRowConverter())).setRowSchema(schema); + } + + class DocumentToRowConverter extends DoFn { + + @ProcessElement + public void processElement(ProcessContext context) { + Document document = context.element(); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + mapBuilder.put(keyField, makeReferenceValue(document.getName()).build()); + mapBuilder.putAll(document.getFieldsMap()); + + context.output(extractRowFromProperties(schema, mapBuilder.build())); + } + + /** + * Convert Firestore {@code Value} to Beam type. + * + * @param currentFieldType Beam {@code Schema.FieldType} to convert to (used for {@code Row} and + * {@code Array}). + * @param val Firestore {@code Value}. + * @return resulting Beam type. + */ + private Object convertValueToObject(Schema.FieldType currentFieldType, Value val) { + Value.ValueTypeCase typeCase = val.getValueTypeCase(); + + switch (typeCase) { + case NULL_VALUE: + case VALUETYPE_NOT_SET: + return null; + case BOOLEAN_VALUE: + return val.getBooleanValue(); + case INTEGER_VALUE: + return val.getIntegerValue(); + case DOUBLE_VALUE: + return val.getDoubleValue(); + case TIMESTAMP_VALUE: + com.google.protobuf.Timestamp time = val.getTimestampValue(); + long millis = Timestamps.toMillis(time); + return Instant.ofEpochMilli(millis).toDateTime(); + case STRING_VALUE: + return val.getStringValue(); + case REFERENCE_VALUE: + return val.getReferenceValue(); + case BYTES_VALUE: + return val.getBytesValue().toByteArray(); + case MAP_VALUE: + // Recursive mapping for row type. + Schema rowSchema = currentFieldType.getRowSchema(); + assert rowSchema != null; + MapValue map = val.getMapValue(); + return extractRowFromProperties(rowSchema, map.getFieldsMap()); + case ARRAY_VALUE: + // Recursive mapping for collection type. + Schema.FieldType elementType = currentFieldType.getCollectionElementType(); + List valueList = val.getArrayValue().getValuesList(); + return valueList.stream() + .map(v -> convertValueToObject(elementType, v)) + .collect(Collectors.toList()); + case GEO_POINT_VALUE: + default: + throw new IllegalStateException( + "No conversion exists from type: " + + val.getValueTypeCase().name() + + " to Beam type."); + } + } + + /** + * Converts all properties of an {@code Document} to Beam {@code Row}. + * + * @param schema Target row {@code Schema}. + * @param values A map of property names and values. + * @return resulting Beam {@code Row}. + */ + private Row extractRowFromProperties(Schema schema, Map values) { + Row.Builder builder = Row.withSchema(schema); + // It is not a guarantee that the values will be in the same order as the schema. + for (Schema.Field field : schema.getFields()) { + Value val = values.get(field.getName()); + builder.addValue(convertValueToObject(field.getType(), val)); + } + return builder.build(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java new file mode 100644 index 000000000000..4f5cbb087e0c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.firestore; + +import com.google.firestore.v1.ArrayValue; +import com.google.firestore.v1.MapValue; +import com.google.firestore.v1.Value; +import com.google.firestore.v1.Value.ValueTypeCase; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.TimestampOrBuilder; +import com.google.type.LatLng; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.logging.Logger; + +/** Helper methods for {@link Firestore}. */ +// TODO: Accept OrBuilders when possible. +public final class FirestoreHelper { + private static final Logger logger = Logger.getLogger(FirestoreHelper.class.getName()); + + private static final int MICROSECONDS_PER_SECOND = 1000 * 1000; + private static final int NANOSECONDS_PER_MICROSECOND = 1000; + + private FirestoreHelper() {} + + /** Make an array value containing the specified values. */ + public static Value.Builder makeValue(Iterable values) { + return Value.newBuilder().setArrayValue(ArrayValue.newBuilder().addAllValues(values)); + } + + /** Make a list value containing the specified values. */ + public static Value.Builder makeValue(Value value1, Value value2, Value... rest) { + ArrayValue.Builder arrayValue = ArrayValue.newBuilder(); + arrayValue.addValues(value1); + arrayValue.addValues(value2); + arrayValue.addAllValues(Arrays.asList(rest)); + return Value.newBuilder().setArrayValue(arrayValue); + } + + /** Make an array value containing the specified values. */ + public static Value.Builder makeValue( + Value.Builder value1, Value.Builder value2, Value.Builder... rest) { + ArrayValue.Builder arrayValue = ArrayValue.newBuilder(); + arrayValue.addValues(value1); + arrayValue.addValues(value2); + for (Value.Builder builder : rest) { + arrayValue.addValues(builder); + } + return Value.newBuilder().setArrayValue(arrayValue); + } + + /** Make a reference value. */ + public static Value.Builder makeReferenceValue(String value) { + return Value.newBuilder().setReferenceValue(value); + } + + /** Make an integer value. */ + public static Value.Builder makeValue(long value) { + return Value.newBuilder().setIntegerValue(value); + } + + /** Make a floating point value. */ + public static Value.Builder makeValue(double value) { + return Value.newBuilder().setDoubleValue(value); + } + + /** Make a boolean value. */ + public static Value.Builder makeValue(boolean value) { + return Value.newBuilder().setBooleanValue(value); + } + + /** Make a string value. */ + public static Value.Builder makeStringValue(String value) { + return Value.newBuilder().setStringValue(value); + } + + /** Make a map value. */ + public static Value.Builder makeValue(MapValue mapValue) { + return Value.newBuilder().setMapValue(mapValue); + } + + /** Make a map value. */ + public static Value.Builder makeValue(MapValue.Builder mapValue) { + return makeValue(mapValue.build()); + } + + /** Make a ByteString value. */ + public static Value.Builder makeValue(ByteString blob) { + return Value.newBuilder().setBytesValue(blob); + } + + /** Make a timestamp value given a date. */ + public static Value.Builder makeValue(Date date) { + return Value.newBuilder().setTimestampValue(toTimestamp(date.getTime() * 1000L)); + } + + private static Timestamp.Builder toTimestamp(long microseconds) { + long seconds = microseconds / MICROSECONDS_PER_SECOND; + long microsecondsRemainder = microseconds % MICROSECONDS_PER_SECOND; + if (microsecondsRemainder < 0) { + // Nanos must be positive even if microseconds is negative. + // Java modulus doesn't take care of this for us. + microsecondsRemainder += MICROSECONDS_PER_SECOND; + seconds -= 1; + } + return Timestamp.newBuilder() + .setSeconds(seconds) + .setNanos((int) microsecondsRemainder * NANOSECONDS_PER_MICROSECOND); + } + + /** Makes a GeoPoint value. */ + public static Value.Builder makeValue(LatLng value) { + return Value.newBuilder().setGeoPointValue(value); + } + + /** + * @return the double contained in value + * @throws IllegalArgumentException if the value does not contain a double. + */ + public static double getDouble(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.DOUBLE_VALUE) { + throw new IllegalArgumentException("Value does not contain a double."); + } + return value.getDoubleValue(); + } + + /** + * @return the reference contained in value + * @throws IllegalArgumentException if the value does not contain a reference. + */ + public static String getReference(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.REFERENCE_VALUE) { + throw new IllegalArgumentException("Value does not contain a reference."); + } + return value.getReferenceValue(); + } + + /** + * @return the blob contained in value + * @throws IllegalArgumentException if the value does not contain a blob. + */ + public static ByteString getByteString(Value value) { + if (value.getValueTypeCase() == ValueTypeCase.BYTES_VALUE) { + return value.getBytesValue(); + } + throw new IllegalArgumentException("Value does not contain a blob."); + } + + /** + * @return the map contained in value + * @throws IllegalArgumentException if the value does not contain a map. + */ + public static MapValue getMap(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.MAP_VALUE) { + throw new IllegalArgumentException("Value does not contain a Map."); + } + return value.getMapValue(); + } + + /** + * @return the string contained in value + * @throws IllegalArgumentException if the value does not contain a string. + */ + public static String getString(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.STRING_VALUE) { + throw new IllegalArgumentException("Value does not contain a string."); + } + return value.getStringValue(); + } + + /** + * @return the boolean contained in value + * @throws IllegalArgumentException if the value does not contain a boolean. + */ + public static boolean getBoolean(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.BOOLEAN_VALUE) { + throw new IllegalArgumentException("Value does not contain a boolean."); + } + return value.getBooleanValue(); + } + + /** + * @return the long contained in value + * @throws IllegalArgumentException if the value does not contain a long. + */ + public static long getLong(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.INTEGER_VALUE) { + throw new IllegalArgumentException("Value does not contain an integer."); + } + return value.getIntegerValue(); + } + + /** + * @return the timestamp in microseconds contained in value + * @throws IllegalArgumentException if the value does not contain a timestamp. + */ + public static long getTimestamp(Value value) { + if (value.getValueTypeCase() == ValueTypeCase.TIMESTAMP_VALUE) { + return toMicroseconds(value.getTimestampValue()); + } + throw new IllegalArgumentException("Value does not contain a timestamp."); + } + + private static long toMicroseconds(TimestampOrBuilder timestamp) { + // Nanosecond precision is lost. + return timestamp.getSeconds() * MICROSECONDS_PER_SECOND + + timestamp.getNanos() / NANOSECONDS_PER_MICROSECOND; + } + + /** + * @return the array contained in value as a list. + * @throws IllegalArgumentException if the value does not contain an array. + */ + public static List getList(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.ARRAY_VALUE) { + throw new IllegalArgumentException("Value does not contain an array."); + } + return value.getArrayValue().getValuesList(); + } + + /** + * Convert a timestamp value into a {@link Date} clipping off the microseconds. + * + * @param value a timestamp value to convert + * @return the resulting {@link Date} + * @throws IllegalArgumentException if the value does not contain a timestamp. + */ + public static Date toDate(Value value) { + return new Date(getTimestamp(value) / 1000); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java new file mode 100644 index 000000000000..511caa812a66 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.firestore; + +import static org.apache.beam.sdk.io.gcp.firestore.FirestoreHelper.makeStringValue; +import static org.apache.beam.sdk.io.gcp.firestore.FirestoreHelper.makeValue; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.MapValue; +import com.google.firestore.v1.Value; +import com.google.protobuf.ByteString; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@code PTransform} to perform a conversion of {@link Row} to {@link Document}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class RowToDocument extends PTransform, PCollection> { + private final String keyField; + private static final Logger LOG = LoggerFactory.getLogger(RowToDocument.class); + + private RowToDocument(String keyField) { + this.keyField = keyField; + } + + @Override + public PCollection expand(PCollection input) { + if (!input + .getSchema() + .getField(keyField) + .getType() + .getTypeName() + .equals(Schema.TypeName.STRING)) { + throw new IllegalStateException( + "Field `" + + keyField + + "` should of type `STRING`. Please change the type or specify a field to" + + " write the KEY value from."); + } + LOG.info("Field to use as Document KEY is set to: `" + keyField + "`."); + return input.apply(ParDo.of(new RowToDocument.RowToDocumentConverter())); + } + + /** + * Create a PTransform instance. + * + * @param keyField Row field containing a String key, must be set. + * @return {@code PTransform} instance for Row to Document conversion. + */ + public static RowToDocument create(String keyField) { + return new RowToDocument(keyField); + } + + class RowToDocumentConverter extends DoFn { + RowToDocumentConverter() { + super(); + } + + @ProcessElement + public void processElement(ProcessContext context) { + Row row = context.element(); + + Schema schemaWithoutKeyField = + Schema.builder() + .addFields( + row.getSchema().getFields().stream() + .filter(field -> !field.getName().equals(keyField)) + .collect(Collectors.toList())) + .build(); + Document.Builder documentBuilder = Document.newBuilder(); + documentBuilder.putAllFields(constructMapFromRow(schemaWithoutKeyField, row).getFieldsMap()); + documentBuilder.setName(constructKeyFromRow(row)); + + context.output(documentBuilder.build()); + } + + /** + * Converts an entire {@code Row} to an appropriate Firestore {@code MapValue}. + * + * @param row {@code Row} to convert. + * @return resulting {@code MapValue}. + */ + private MapValue constructMapFromRow(Schema schema, Row row) { + MapValue.Builder mapValueBuilder = MapValue.newBuilder(); + for (Schema.Field field : schema.getFields()) { + Value val = mapObjectToValue(row.getValue(field.getName())); + mapValueBuilder.putFields(field.getName(), val); + } + return mapValueBuilder.build(); + } + + /** + * Create a random key for a {@code Row} without a keyField or use a user-specified key by + * parsing it from byte array when keyField is set. + * + * @param row {@code Row} to construct a key for. + * @return resulting {@code Key}. + */ + private String constructKeyFromRow(Row row) { + return row.getString(keyField); + } + + /** + * Converts a {@code Row} value to an appropriate Firestore {@code Value} object. + * + * @param value {@code Row} value to convert. + * @return resulting {@code Value}. + * @throws IllegalStateException when no mapping function for object of given type exists. + */ + private Value mapObjectToValue(Object value) { + if (value == null) { + return Value.newBuilder().build(); + } + + if (Boolean.class.equals(value.getClass())) { + return makeValue((Boolean) value).build(); + } else if (Byte.class.equals(value.getClass())) { + return makeValue((Byte) value).build(); + } else if (Long.class.equals(value.getClass())) { + return makeValue((Long) value).build(); + } else if (Short.class.equals(value.getClass())) { + return makeValue((Short) value).build(); + } else if (Integer.class.equals(value.getClass())) { + return makeValue((Integer) value).build(); + } else if (Double.class.equals(value.getClass())) { + return makeValue((Double) value).build(); + } else if (Float.class.equals(value.getClass())) { + return makeValue((Float) value).build(); + } else if (String.class.equals(value.getClass())) { + return makeStringValue((String) value).build(); + } else if (Instant.class.equals(value.getClass())) { + return makeValue(((Instant) value).toDate()).build(); + } else if (byte[].class.equals(value.getClass())) { + return makeValue(ByteString.copyFrom((byte[]) value)).build(); + } else if (value instanceof Row) { + // Recursive conversion to handle nested rows. + Row row = (Row) value; + return makeValue(constructMapFromRow(row.getSchema(), row)).build(); + } else if (value instanceof Collection) { + // Recursive to handle nested collections. + Collection collection = (Collection) value; + List arrayValues = + collection.stream().map(this::mapObjectToValue).collect(Collectors.toList()); + return makeValue(arrayValues).build(); + } + throw new IllegalStateException( + "No conversion exists from type: " + value.getClass() + " to Firestore Value."); + } + } +} From ef11ed5003813e51abbba978ba6997dde34e4852 Mon Sep 17 00:00:00 2001 From: "balazs.nemeth@doctusoft.com" Date: Tue, 15 Nov 2022 09:58:54 +0100 Subject: [PATCH 2/3] GH actions fix --- .../org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java index 4f5cbb087e0c..cfd14e0320b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreHelper.java @@ -28,12 +28,10 @@ import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.logging.Logger; /** Helper methods for {@link Firestore}. */ // TODO: Accept OrBuilders when possible. public final class FirestoreHelper { - private static final Logger logger = Logger.getLogger(FirestoreHelper.class.getName()); private static final int MICROSECONDS_PER_SECOND = 1000 * 1000; private static final int NANOSECONDS_PER_MICROSECOND = 1000; From c960752f4b54a093787e47402b2d24a634e7f3b4 Mon Sep 17 00:00:00 2001 From: "balazs.nemeth@doctusoft.com" Date: Tue, 15 Nov 2022 11:54:50 +0100 Subject: [PATCH 3/3] Unit tests --- .../sdk/io/gcp/firestore/DocumentToRow.java | 12 +- .../sdk/io/gcp/firestore/RowToDocument.java | 18 +- .../DocumentToRowRowToDocumentTest.java | 157 ++++++++++++++++++ 3 files changed, 168 insertions(+), 19 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRowRowToDocumentTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java index 2d0c9d8cc2d2..c67efaeb6009 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRow.java @@ -50,16 +50,8 @@ private DocumentToRow(Schema schema, String keyField) { this.schema = schema; this.keyField = keyField; - if (schema.getFieldNames().contains(keyField)) { - if (!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.STRING)) { - throw new IllegalStateException( - "Field `" - + keyField - + "` should of type `STRING`. Please change the type or specify a field to" - + " store the KEY value."); - } - LOG.info("Document KEY will be stored under `" + keyField + "` field."); - } + RowToDocument.validateKeyFieldPresenceAndType(schema, keyField); + LOG.info("Document KEY will be stored under `" + keyField + "` field."); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java index 511caa812a66..c718522ea266 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RowToDocument.java @@ -49,20 +49,20 @@ private RowToDocument(String keyField) { this.keyField = keyField; } - @Override - public PCollection expand(PCollection input) { - if (!input - .getSchema() - .getField(keyField) - .getType() - .getTypeName() - .equals(Schema.TypeName.STRING)) { + public static void validateKeyFieldPresenceAndType(Schema schema, String keyField) { + if (!schema.getFieldNames().contains(keyField) + || !schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.STRING)) { throw new IllegalStateException( "Field `" + keyField - + "` should of type `STRING`. Please change the type or specify a field to" + + "` should be present and of type `STRING`. Please change the type or specify a field to" + " write the KEY value from."); } + } + + @Override + public PCollection expand(PCollection input) { + validateKeyFieldPresenceAndType(input.getSchema(), keyField); LOG.info("Field to use as Document KEY is set to: `" + keyField + "`."); return input.apply(ParDo.of(new RowToDocument.RowToDocumentConverter())); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRowRowToDocumentTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRowRowToDocumentTest.java new file mode 100644 index 000000000000..6c00553ea3fd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/DocumentToRowRowToDocumentTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.firestore; + +import static org.apache.beam.sdk.io.gcp.firestore.FirestoreHelper.makeStringValue; +import static org.apache.beam.sdk.io.gcp.firestore.FirestoreHelper.makeValue; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTES; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.apache.beam.sdk.schemas.Schema.FieldType.array; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.MapValue; +import com.google.firestore.v1.Value; +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DocumentToRowRowToDocumentTest { + private static final String KEY = UUID.randomUUID().toString(); + private static final DateTime DATE_TIME = + parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.567"); + static final String DEFAULT_KEY_FIELD = "firestoreKey"; + + final Logger log = Logger.getLogger(DocumentToRowRowToDocumentTest.class.getName()); + + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder().addNullableField("nestedLong", INT64).build(); + private static final Schema SCHEMA = + Schema.builder() + .addNullableField(DEFAULT_KEY_FIELD, STRING) + .addNullableField("long", INT64) + .addNullableField("bool", BOOLEAN) + .addNullableField("datetime", DATETIME) + .addNullableField("array", array(STRING)) + .addNullableField("rowArray", array(FieldType.row(NESTED_ROW_SCHEMA))) + .addNullableField("double", DOUBLE) + .addNullableField("bytes", BYTES) + .addNullableField("string", STRING) + .addNullableField("nullable", INT64) + .build(); + private static final MapValue NESTED_ENTITY = + MapValue.newBuilder().putFields("nestedLong", makeValue(Long.MIN_VALUE).build()).build(); + private static final Document ENTITY = + Document.newBuilder() + .setName(KEY) + .putFields("long", makeValue(Long.MAX_VALUE).build()) + .putFields("bool", makeValue(true).build()) + .putFields("datetime", makeValue(DATE_TIME.toDate()).build()) + .putFields( + "array", makeValue(makeStringValue("string1"), makeStringValue("string2")).build()) + .putFields( + "rowArray", + makeValue(Collections.singletonList(makeValue(NESTED_ENTITY).build())).build()) + .putFields("double", makeValue(Double.MAX_VALUE).build()) + .putFields( + "bytes", makeValue(ByteString.copyFrom("hello", StandardCharsets.UTF_8)).build()) + .putFields("string", makeStringValue("string").build()) + .putFields("nullable", Value.newBuilder().build()) + .build(); + private static final Row ROW = + row( + SCHEMA, + KEY, + Long.MAX_VALUE, + true, + DATE_TIME, + Arrays.asList("string1", "string2"), + Collections.singletonList(row(NESTED_ROW_SCHEMA, Long.MIN_VALUE)), + Double.MAX_VALUE, + "hello".getBytes(StandardCharsets.UTF_8), + "string", + null); + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testEntityToRowConverter() { + PCollection result = + pipeline.apply(Create.of(ENTITY)).apply(DocumentToRow.create(SCHEMA, DEFAULT_KEY_FIELD)); + PAssert.that(result).containsInAnyOrder(ROW); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testRowToEntityConverter() { + PCollection result = + pipeline + .apply(Create.of(ROW)) + .setRowSchema(SCHEMA) + .apply(RowToDocument.create(DEFAULT_KEY_FIELD)); + PAssert.that(result).containsInAnyOrder(ENTITY); + + pipeline.run().waitUntilFinish(); + } + + @Test(expected = IllegalStateException.class) + public void testConverterWithoutKey() { + Schema schemaWithoutKey = + Schema.builder() + .addFields( + SCHEMA.getFields().stream() + .filter(f -> !f.getName().equals(DEFAULT_KEY_FIELD)) + .collect(Collectors.toList())) + .build(); + RowToDocument.validateKeyFieldPresenceAndType(schemaWithoutKey, DEFAULT_KEY_FIELD); + } + + private static Row row(Schema schema, Object... values) { + return Row.withSchema(schema).addValues(values).build(); + } + + public static DateTime parseTimestampWithUTCTimeZone(String str) { + if (str.indexOf('.') == -1) { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC().parseDateTime(str); + } else { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC().parseDateTime(str); + } + } +}