Skip to content

Commit 62a3bbb

Browse files
committed
Zyp Treatments: Add normalize_complex_lists option
Because CrateDB can not store lists of varying objects, try to normalize them, currently biased towards strings and floats.
1 parent f9d5926 commit 62a3bbb

File tree

4 files changed

+129
-1
lines changed

4 files changed

+129
-1
lines changed

src/zyp/model/treatment.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import builtins
12
import typing as t
23

34
from attr import Factory
@@ -13,6 +14,7 @@ class Treatment(Dumpable):
1314
convert_list: t.List[str] = Factory(list)
1415
convert_string: t.List[str] = Factory(list)
1516
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
17+
normalize_complex_lists: bool = False
1618
prune_invalid_date: t.List[str] = Factory(list)
1719

1820
def apply(self, data: DictOrList) -> DictOrList:
@@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record:
2830
local_ignores = []
2931
if self.ignore_complex_lists:
3032
for k, v in data.items():
31-
if isinstance(v, list) and v and isinstance(v[0], dict):
33+
if self.is_list_of_dicts(v):
3234
# Skip ignoring special-encoded items.
3335
if v[0] and list(v[0].keys())[0].startswith("$"):
3436
continue
@@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record:
3941
if ignore_name in data:
4042
del data[ignore_name]
4143

44+
# Apply normalization for lists of objects.
45+
if self.normalize_complex_lists:
46+
for _, v in data.items():
47+
if self.is_list_of_dicts(v):
48+
ListOfVaryingObjectsNormalizer(v).apply()
49+
4250
# Converge certain items to `list` even when defined differently.
4351
for to_list_name in self.convert_list:
4452
if to_list_name in data and not isinstance(data[to_list_name], list):
@@ -66,3 +74,58 @@ def apply_record(self, data: Record) -> Record:
6674
del data[key]
6775

6876
return data
77+
78+
@staticmethod
79+
def is_list_of_dicts(v: t.Any) -> bool:
80+
return isinstance(v, list) and bool(v) and isinstance(v[0], dict)
81+
82+
83+
@define
84+
class NormalizerRule:
85+
"""
86+
Manage details of a normalizer rule.
87+
"""
88+
89+
name: str
90+
converter: t.Callable
91+
92+
93+
@define
94+
class ListOfVaryingObjectsNormalizer:
95+
"""
96+
CrateDB can not store lists of varying objects, so try to normalize them.
97+
"""
98+
99+
data: Collection
100+
101+
def apply(self):
102+
self.apply_rules(self.get_rules(self.type_stats()))
103+
104+
def apply_rules(self, rules: t.List[NormalizerRule]) -> None:
105+
for item in self.data:
106+
for rule in rules:
107+
name = rule.name
108+
if name in item:
109+
item[name] = rule.converter(item[name])
110+
111+
def get_rules(self, statistics) -> t.List[NormalizerRule]:
112+
rules = []
113+
for name, types in statistics.items():
114+
if len(types) > 1:
115+
rules.append(NormalizerRule(name=name, converter=self.get_best_converter(types)))
116+
return rules
117+
118+
def type_stats(self) -> t.Dict[str, t.List[str]]:
119+
types: t.Dict[str, t.List[str]] = {}
120+
for item in self.data:
121+
for key, value in item.items():
122+
types.setdefault(key, []).append(type(value).__name__)
123+
return types
124+
125+
@staticmethod
126+
def get_best_converter(types: t.List[str]) -> t.Callable:
127+
if "str" in types:
128+
return builtins.str
129+
if "float" in types and "int" in types and "str" not in types:
130+
return builtins.float
131+
return lambda x: x

tests/transform/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
RESET_TABLES = [
44
"from.dynamodb",
5+
"from.generic",
56
]
67

78

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import pytest
2+
3+
from commons_codec.model import SQLOperation
4+
from zyp.model.treatment import Treatment
5+
6+
7+
@pytest.mark.integration
8+
def test_normalize_list_of_objects(caplog, cratedb):
9+
"""
10+
Verify writing record to CrateDB, with transformations.
11+
"""
12+
13+
record_in = {
14+
"_list_float_int": [{"abc": 42.42}, {"abc": 42}],
15+
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}],
16+
"_list_int_str": [{"abc": 123}, {"abc": "123"}],
17+
}
18+
19+
record_out = {
20+
"_list_float_int": [{"abc": 42.42}, {"abc": 42.0}],
21+
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}],
22+
"_list_int_str": [{"abc": "123"}, {"abc": "123"}],
23+
}
24+
25+
# Define CrateDB SQL DDL and DML operations (SQL+parameters).
26+
operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None)
27+
operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": record_in})
28+
29+
# Apply treatment to parameters.
30+
parameters = operation_dml.parameters
31+
Treatment(normalize_complex_lists=True).apply(parameters)
32+
33+
# Insert into CrateDB.
34+
cratedb.database.run_sql(operation_ddl.statement)
35+
cratedb.database.run_sql(operation_dml.statement, parameters)
36+
37+
# Verify data in target database.
38+
assert cratedb.database.refresh_table("from.generic") is True
39+
40+
results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608
41+
assert results[0]["data"] == record_out

tests/zyp/test_treatment.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,29 @@ def test_treatment_ignore_fields():
7070
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}]
7171

7272

73+
def test_treatment_normalize_complex_lists_success():
74+
"""
75+
Verify normalizing lists of objects works.
76+
"""
77+
transformation = Treatment(normalize_complex_lists=True)
78+
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [
79+
{"data": [{"abc": 123.42}, {"abc": 123.0}]}
80+
]
81+
assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [
82+
{"data": [{"abc": "123"}, {"abc": "123"}]}
83+
]
84+
85+
86+
def test_treatment_normalize_complex_lists_passthrough():
87+
"""
88+
When no normalization rule can be applied, return input 1:1.
89+
"""
90+
transformation = Treatment(normalize_complex_lists=True)
91+
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": None}]}]) == [
92+
{"data": [{"abc": 123.42}, {"abc": None}]}
93+
]
94+
95+
7396
def test_treatment_convert_string():
7497
"""
7598
Verify treating nested data to convert values into strings works.

0 commit comments

Comments
 (0)