|
| 1 | +# |
| 2 | +# Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | +# contributor license agreements. See the NOTICE file distributed with |
| 4 | +# this work for additional information regarding copyright ownership. |
| 5 | +# The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | +# (the "License"); you may not use this file except in compliance with |
| 7 | +# the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, software |
| 12 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +# See the License for the specific language governing permissions and |
| 15 | +# limitations under the License. |
| 16 | +# |
| 17 | + |
| 18 | +from typing import NamedTuple |
| 19 | + |
| 20 | + |
| 21 | +class Row(object): |
| 22 | + """A dynamic schema'd row object. |
| 23 | +
|
| 24 | + This objects attributes are initialized from the keywords passed into its |
| 25 | + constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y. |
| 26 | +
|
| 27 | + More importantly, when a Row object is returned from a `Map`, `FlatMap`, or |
| 28 | + `DoFn` type inference is able to deduce the schema of the resulting |
| 29 | + PCollection, e.g. |
| 30 | +
|
| 31 | + pc | beam.Map(lambda x: Row(x=x, y=0.5 * x)) |
| 32 | +
|
| 33 | + when applied to a PCollection of ints will produce a PCollection with schema |
| 34 | + `(x=int, y=float)`. |
| 35 | +
|
| 36 | + Note that in Beam 2.30.0 and later, Row objects are sensitive to field order. |
| 37 | + So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`. |
| 38 | + """ |
| 39 | + def __init__(self, **kwargs): |
| 40 | + self.__dict__.update(kwargs) |
| 41 | + |
| 42 | + def as_dict(self): |
| 43 | + return dict(self.__dict__) |
| 44 | + |
| 45 | + # For compatibility with named tuples. |
| 46 | + _asdict = as_dict |
| 47 | + |
| 48 | + def __iter__(self): |
| 49 | + for _, value in self.__dict__.items(): |
| 50 | + yield value |
| 51 | + |
| 52 | + def __repr__(self): |
| 53 | + return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items()) |
| 54 | + |
| 55 | + def __hash__(self): |
| 56 | + return hash(self.__dict__.items()) |
| 57 | + |
| 58 | + def __eq__(self, other): |
| 59 | + if type(self) == type(other): |
| 60 | + other_dict = other.__dict__ |
| 61 | + elif type(other) == type(NamedTuple): |
| 62 | + other_dict = other._asdict() |
| 63 | + else: |
| 64 | + return False |
| 65 | + return ( |
| 66 | + len(self.__dict__) == len(other_dict) and |
| 67 | + all(s == o for s, o in zip(self.__dict__.items(), other_dict.items()))) |
| 68 | + |
| 69 | + def __reduce__(self): |
| 70 | + return _make_Row, tuple(self.__dict__.items()) |
| 71 | + |
| 72 | + |
| 73 | +def _make_Row(*items): |
| 74 | + return Row(**dict(items)) |
0 commit comments