diff --git a/.circleci/config.yml b/.circleci/config.yml index 1ad18c6..7825b98 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,10 +8,10 @@ workflows: matrix: parameters: python: - - "3.6" - "3.7" - "3.8" - "3.9" + - "3.10" - black jobs: diff --git a/cleancat/__init__.py b/cleancat/__init__.py index e3ffca9..d84098e 100644 --- a/cleancat/__init__.py +++ b/cleancat/__init__.py @@ -20,6 +20,7 @@ URL, ValidationError, ) +from . import chausie __all__ = [ 'Bool', @@ -42,6 +43,7 @@ 'TrimmedString', 'URL', 'ValidationError', + 'chausie', ] -__version__ = '1.0.0' +__version__ = '1.1.0' diff --git a/cleancat/chausie/README.md b/cleancat/chausie/README.md new file mode 100644 index 0000000..dd05872 --- /dev/null +++ b/cleancat/chausie/README.md @@ -0,0 +1,165 @@ +CleanChausie +======== + +Data validation and transformation library for Python. Successor to CleanCat. + +Key features: +* Operate on/with type-checked objects that have good IDE/autocomplete support +* Annotation-based declarations for simple fields +* Composable/reusable fields and field validation logic +* Support (but not require) passing around a context (to avoid global state) + - Context pattern is compatible with explicit sqlalchemy-based session management. i.e. pass in a session when validating +* Cleanly support intra-schema field dependencies (i.e. one field can depend on the validated value of another) +* Explicit nullability/omission parameters +* Errors returned for multiple fields at a time, with field attribution + +## CleanChausie By Example + +### Basic example in Flask + +This is a direct port of the example from the OG cleancat README. + +This shows: +* Annotation-based declarations for simple fields. +* Type-checked objects (successful validation results in initialized instances of the schema) + +```python +from typing import List +from cleancat.chausie.field import ( + field, emailfield, listfield, urlfield, ValidationError, +) +from cleancat.chausie.schema import Schema +from flask import app, request, jsonify + +class JobApplication(Schema): + first_name: str + last_name: str + email: str = field(emailfield()) + urls: List[str] = field(listfield(urlfield(default_scheme='http://'))) + +@app.route('/job_application', methods=['POST']) +def test_view(): + result = JobApplication.clean(request.json) + if isinstance(result, ValidationError): + return jsonify({'errors': [{'msg': e.msg, 'field': e.field} for e in result.errors] }), 400 + + # Now "result" has the validated data, in the form of a `JobApplication` instance. + assert isinstance(result, JobApplication) + name = f'{result.first_name} {result.last_name}' +``` + +### Explicit Nullability + +TODO revisit omission defaults so that they match the annotation + +```python +from typing import Optional, Union +from cleancat.chausie.consts import OMITTED +from cleancat.chausie.field import field, strfield, Optional as CCOptional, Required +from cleancat.chausie.schema import Schema + +class NullabilityExample(Schema): + # auto defined based on annotations + nonnull_required: str + nullable_omittable: Optional[str] + + # manually specified + nonnull_omittable: Union[str, OMITTED] = field(strfield, nullability=CCOptional(allow_none=False)) + nullable_required: Optional[str] = field(strfield, nullability=Required(allow_none=True)) +``` + +### Composable/Reusable Fields + +```python +from typing import Union +from cleancat.chausie.field import field, Field, strfield, intfield, Error +from cleancat.chausie.schema import Schema + +@field(parents=(strfield,)) +def trimmed_string(value: str) -> str: + return value.strip() + +def max_val(max_value: int) -> Field: + @field() + def _max_val(value: int) -> Union[int, Error]: + if value > max_value: + return Error(msg=f'value is above allowed max of {max_value}') + return value + return _max_val + +def min_val(min_value: int) -> Field: + @field() + def _min_val(value: int) -> Union[int, Error]: + if value < min_value: + return Error(msg=f'value is below allowed min of {min_value}') + return value + return _min_val + +def constrained_int(min: int, max: int) -> Field: + return field(parents=(intfield, min_val(min), max_val(max)))() + +class ReusableFieldsExampleSchema(Schema): + first_name: str = trimmed_string + age: int = field(parents=(intfield, min_val(0)))() + score: int = constrained_int(min=0, max=100) +``` + +### Context Support + +```python +import attrs +from cleancat.chausie.field import field, strfield +from cleancat.chausie.schema import Schema + +class MyModel: # some ORM model + id: str + created_by: 'User' + +@attrs.frozen +class Context: + authenticated_user: 'User' # the User making a request + session: 'Session' # active ORM Session + +class ContextExampleSchema(Schema): + @field(parents=(strfield,), accepts=('id',)) + def obj(self, value: str, context: Context) -> MyModel: + return ( + context.session + .query(MyModel) + .filter(MyModel.created_by == context.authenticated_user.id) + .filter(MyModel.id == value) + ) + +with atomic() as session: + result = ContextExampleSchema.clean( + data={'id': 'mymodel_primarykey'}, + context=Context(authenticated_user=EXAMPLE_USER, session=session) + ) +assert isinstance(result, ContextExampleSchema) +assert isinstance(result.obj, MyModel) +``` + + +### Intra-schema Field dependencies + +```python +from cleancat.chausie.field import field +from cleancat.chausie.schema import Schema + +class DependencyExampleSchema(Schema): + a: str + b: str + + @field() + def a_and_b(self, a: str, b: str) -> str: + return f'{a}::{b}' + + +result = DependencyExampleSchema.clean( + data={'a': 'a', 'b': 'b'}, +) +assert isinstance(result, DependencyExampleSchema) +assert result.a_and_b == 'a::b' +``` + +### Per-Field Errors diff --git a/cleancat/chausie/__init__.py b/cleancat/chausie/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cleancat/chausie/consts.py b/cleancat/chausie/consts.py new file mode 100644 index 0000000..98f7e56 --- /dev/null +++ b/cleancat/chausie/consts.py @@ -0,0 +1,24 @@ +class OMITTED: + """used as singleton for omitted values in validation""" + + def __repr__(self) -> str: + return "omitted" + + def __str__(self) -> str: + return "omitted" + + +omitted = OMITTED() + + +class EMPTY: + """used as singleton for omitted options/kwargs""" + + def __repr__(self) -> str: + return "empty" + + def __str__(self) -> str: + return "empty" + + +empty = EMPTY() diff --git a/cleancat/chausie/ext/__init__.py b/cleancat/chausie/ext/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cleancat/chausie/ext/attrs.py b/cleancat/chausie/ext/attrs.py new file mode 100644 index 0000000..869ccf6 --- /dev/null +++ b/cleancat/chausie/ext/attrs.py @@ -0,0 +1,87 @@ +from typing import Any, Dict, Generic, Type, TypeVar, Union +import attr +from cleancat.chausie.consts import empty +from cleancat.chausie.field import ( + Error, + Field, + Optional, + Required, + ValidationError, +) +from cleancat.chausie.schema import field_def_from_annotation + +from cleancat.chausie.schema_definition import ( + SchemaDefinition, + clean as clean_schema, +) + + +def convert_attrib_to_field(attrib: attr.Attribute) -> Field: + """Convert attr Attribute to cleanchausie Field.""" + if attrib.type: + field = field_def_from_annotation(attrib.type) + assert field + else: + field = Field( + validators=(), + accepts=(), + nullability=Required(), + depends_on=(), + serialize_to=None, + serialize_func=lambda v: v, + ) + + if attrib.default: + nullability = Optional(omitted_value=attrib.default) + field = attr.evolve(field, nullability=nullability) + + if attrib.validator: + + def _validate(value): + try: + # no ability to validate against other values on the + # instance (since no instance exists yet), but should + # support simple validation cases. + attrib.validator(None, attrib, value) + return value + except Exception as e: + return Error(msg=str(e)) + + new_validators = (field.validators or ()) + (_validate,) + field = attr.evolve(field, validators=new_validators) + + return field + + +def schema_def_from_attrs_class(attrs_class: Type) -> SchemaDefinition: + return SchemaDefinition( + fields={ + attr_field.name: convert_attrib_to_field(attr_field) + for attr_field in attr.fields(attrs_class) + } + ) + + +T = TypeVar('T') + + +@attr.frozen +class AttrsSchema(Generic[T]): + attrs_class: Type[T] + schema_definition: SchemaDefinition + + def clean( + self, data: Dict, context: Any = empty + ) -> Union[T, ValidationError]: + result = clean_schema(self.schema_definition, data, context) + if isinstance(result, ValidationError): + return result + else: + return self.attrs_class(**result) + + +def schema_for_attrs_class(attrs_class: Type[T]) -> AttrsSchema[T]: + schema_definition = schema_def_from_attrs_class(attrs_class=attrs_class) + return AttrsSchema( + attrs_class=attrs_class, schema_definition=schema_definition + ) diff --git a/cleancat/chausie/field.py b/cleancat/chausie/field.py new file mode 100644 index 0000000..2ffa1fe --- /dev/null +++ b/cleancat/chausie/field.py @@ -0,0 +1,607 @@ +import datetime +import functools +import inspect +import itertools +import re +from enum import Enum + +import attr +from typing import ( + Generic, + TypeVar, + Union, + Dict, + List, + Tuple, + Any, + Callable, + Optional as T_Optional, + Collection, + Type, + overload, + TYPE_CHECKING, +) + +try: + from typing import Protocol +except ImportError: + from typing_extensions import Protocol + +from dateutil import parser + +from cleancat.chausie.consts import omitted, empty + +if TYPE_CHECKING: + from .schema import Schema + + +@attr.frozen +class ValidationError: + errors: List["Error"] + + def serialize(self) -> Dict: + """Serialize field-level errors dict. + + This is useful for rest responses and test assertions. + """ + return { + 'errors': [{'msg': e.msg, 'field': e.field} for e in self.errors] + } + + +@attr.frozen +class Error: + msg: str + field: Tuple[str, ...] = tuple() + + +@attr.frozen +class Errors: + errors: List[Error] + field: Tuple[Union[str, int], ...] = tuple() + + def flatten(self) -> List[Error]: + return [ + wrap_result(field=self.field, result=err) for err in self.errors + ] + + +T = TypeVar("T") + + +@attr.frozen +class Value(Generic[T]): + value: T + + +@attr.frozen +class UnvalidatedWrappedValue: + value: Collection + inner_field: "Field" + + construct: Callable + """Called to construct the wrapped type with validated data.""" + + +class Nullability: + allow_none: bool + + +@attr.frozen +class Required(Nullability): + allow_none: bool = False + + +@attr.frozen +class Optional(Nullability): + allow_none: bool = True + omitted_value: Any = omitted + + +@overload +def wrap_result(field: Tuple[Union[str, int], ...], result: Error) -> Error: + ... + + +@overload +def wrap_result(field: Tuple[Union[str, int], ...], result: Value) -> Value: + ... + + +def wrap_result( + field: Tuple[Union[str, int], ...], result: Any +) -> Union[Value, Error]: + if isinstance(result, Error): + return attr.evolve(result, field=field + result.field) + elif not isinstance(result, Value): + return Value(value=result) + return result + + +FType = TypeVar("FType") + + +@attr.frozen +class Field(Generic[FType]): + validators: Tuple[Callable, ...] + """Callable that validate a the given field's value.""" + + accepts: Tuple[str, ...] + """Field names accepted when parsing unvalidated input. + + If left unspecified, effectively defaults to the name of the attribute + defined on the schema. + """ + + serialize_to: T_Optional[str] + """If provided overrides the name of the field during serialization.""" + + serialize_func: Callable + """Used when serializing this field. Defaults to a noop passthrough.""" + + nullability: Nullability + + depends_on: Tuple[str, ...] + """Other fields on the same schema this field depends on""" + + def __get__(self, instance, owner) -> FType: + return super().__get__(instance, owner) + + def run_validators( + self, + field: Tuple[Union[str, int], ...], + value: Any, + context: Any, + intermediate_results: Dict[str, Any], + ) -> Union[Value, Errors]: + def _get_deps(func): + return { + param for param in inspect.signature(func).parameters.keys() + } + + # handle nullability + if value in (omitted, None) and any( + ["value" in _get_deps(v) for v in self.validators] + ): + if value is None: + if self.nullability.allow_none: + return Value(value) + else: + if isinstance(self.nullability, Required): + msg = "Value is required, and must not be None." + else: + msg = "Value must not be None." + + return Errors( + field=field, + errors=[Error(msg=msg)], + ) + + if isinstance(self.nullability, Required): + return Errors( + field=field, errors=[Error(msg="Value is required.")] + ) + elif isinstance(self.nullability, Optional): + return Value(self.nullability.omitted_value) + else: + raise TypeError + + def inject_deps(func, val): + deps = _get_deps(func) + if not deps: + return func + + # an empty context default value means its optional/passthrough + if ( + "context" in deps + and context is empty + and inspect.signature(func).parameters["context"].default + is not empty + ): + raise ValueError( + "Context is required for evaluating this schema." + ) + + return functools.partial( + func, + **{ + dep: v.value + for dep, v in intermediate_results.items() + if dep in deps + }, + **{ + dep: v + for dep, v in {"context": context, "value": val}.items() + if dep in deps + }, + ) + + result = value + for validator in self.validators: + result = inject_deps(func=validator, val=result)() + if isinstance(result, Errors): + return Errors(field=field, errors=result.flatten()) + elif isinstance(result, Error): + return Errors(field=field, errors=[result]) + elif isinstance(result, UnvalidatedWrappedValue): + inner_results = [ + result.inner_field.run_validators( + field=(idx,), + value=inner_value, + context=context, + intermediate_results=intermediate_results, + ) + for idx, inner_value in enumerate(result.value) + ] + flattened_errors = [] + for r in inner_results: + if isinstance(r, Error): + flattened_errors.append(r) + elif isinstance(r, Errors): + flattened_errors.extend(r.flatten()) + errors = Errors(field=field, errors=flattened_errors) + if errors.errors: + return errors + else: + # construct result with the validated inner data + result = result.construct(inner_results) + + return wrap_result(field=field, result=result) + + +V = TypeVar("V") + + +def noop(value: V) -> V: + return value + + +class InnerFieldProto(Protocol[FType]): + @overload + def __call__(self) -> Field[FType]: + ... + + @overload + def __call__( + self, inner_func: Union[Callable[..., FType], Field[FType]] + ) -> Field[FType]: + ... + + def __call__( + self, + inner_func: Union[Callable[..., FType], Field[FType], None] = None, + ) -> Field[FType]: + ... + + +# when decorating a function (decorated func is passed to the inner func) +@overload +def field( + *, + parents: Tuple[Union[Callable[..., FType], Field[FType]], ...] = tuple(), + accepts: Tuple[str, ...] = tuple(), + serialize_to: T_Optional[str] = None, + serialize_func: Callable = noop, + nullability: Nullability = Required(), +) -> InnerFieldProto[FType]: + ... + + +# defining simple fields with existing functions +@overload +def field( + decorated_func: Callable[..., FType], + *, + parents: Tuple[Union[Callable, Field], ...] = tuple(), + accepts: Tuple[str, ...] = tuple(), + serialize_to: T_Optional[str] = None, + serialize_func: Callable = noop, + nullability: Nullability = Required(), +) -> Field[FType]: + ... + + +def field( + decorated_func: T_Optional[Union[Callable, Field]] = None, + *, + parents: Tuple[Union[Callable, Field], ...] = tuple(), + accepts: Tuple[str, ...] = tuple(), + serialize_to: T_Optional[str] = None, + serialize_func: Callable = noop, + nullability: Nullability = Required(), +) -> Union[Callable[[Callable], Field], Field]: + """Defines a Field. + + Args: + parents: Optionally a list of any parent fields. Validated values chain between + parents in order they've been given here, before being passed to this + field's validation function. Note that if a `Field` is given instead of a + `Callable`, only the validators are reused. + accepts: Optionally a list of field names to accept values from. If not given, + defaults to the field name on the schema. Field names given first are given + precedent. + serialize_to: The field name to serialize to. Defaults to the field name on the + schema. + serialize_func: Optionally a function that transforms the serialized value + during serialization. Defaults to noop, which passes through the value + unchanged. + nullability: An instance of one of `Nullability`'s descendants, used to define + behavior if a field is omitted or falsy. Defaults to Required. + """ + + def _outer_field(inner_func: Union[Callable, Field, None] = None) -> Field: + # flatten any parents defined as fields + validators: List[Callable] = [] + for p in parents + (inner_func or noop,): + if isinstance(p, Field): + validators.extend(p.validators) + else: + validators.append(p) + + # find any declared dependencies on other fields + deps = { + n + for n in itertools.chain( + *[inspect.signature(f).parameters.keys() for f in validators] + ) + if n not in {"context", "value"} + } + return Field( + nullability=nullability, + validators=tuple(validators), + accepts=accepts, + serialize_to=serialize_to, + serialize_func=serialize_func, + depends_on=tuple(deps), + ) + + if decorated_func is not None: + return _outer_field(inner_func=decorated_func) + else: + return _outer_field + + +def intfield(value: Any) -> Union[int, Error]: + """Simple string coercion/validation for int values.""" + # coerce from string if needed + if isinstance(value, int): + return value + elif isinstance(value, str): + try: + return int(value) + except (ValueError, TypeError): + return Error(msg="Unable to parse int from given string.") + + return Error(msg="Unhandled type, could not coerce.") + + +def strfield(value: Any) -> Union[str, Error]: + """Simple validation for str values.""" + if isinstance(value, str): + return value + + return Error(msg="Unhandled type") + + +class WrapperField: + inner_field: Field + + def __init__(self, inner_field: Field): + self.inner_field = inner_field + + def __call__( + self, value: Any + ) -> Union[UnvalidatedWrappedValue, Error, Errors]: + result = self.impl(value) + if not isinstance(result, (Error, Errors)): + return UnvalidatedWrappedValue( + inner_field=self.inner_field, + construct=self.construct, + value=value, + ) + return result + + def impl(self, value: Any): + raise NotImplementedError() + + def construct(self, values: List[Value]) -> Any: + raise NotImplementedError() + + +class _ListField(WrapperField): + def impl(self, value: Any) -> Union[List, Error]: + if isinstance(value, tuple): + value = list(value) + + if isinstance(value, list): + return value + + return Error(msg="Unhandled type") + + def construct(self, values: List[Value]) -> List: + return [v.value for v in values] + + +# alias for consistency with other fields +listfield = _ListField + + +class _NestedField: + inner_schema: Type["Schema"] + + def __init__(self, schema: Type["Schema"]): + self.inner_schema = schema + + def __call__( + self, value: Any, context: Any = empty + ) -> Union["Schema", Errors]: + result = self.inner_schema.clean(value, context=context) + if isinstance(result, ValidationError): + return Errors(errors=result.errors) + elif isinstance(result, self.inner_schema): + return result + + raise TypeError + + +nestedfield = _NestedField + +EnumCls = TypeVar("EnumCls", bound=Enum) + + +class _EnumField(Generic[EnumCls]): + enum_cls: Type[EnumCls] + + def __init__(self, enum_cls: Type[EnumCls]): + self.enum_cls = enum_cls + + def __call__(self, value: Any) -> Union[EnumCls, Error]: + try: + return self.enum_cls(value) + except (ValueError, TypeError): + return Error(msg="Invalid value for enum.") + + +enumfield = _EnumField + + +def regexfield(regex: str, flags: int = 0) -> Field: + _compiled_regex = re.compile(regex, flags) + + def _validate_regex(value: str) -> Union[str, Error]: + if not _compiled_regex.match(value): + return Error(msg="Invalid input.") + return value + + return field(_validate_regex, parents=(strfield,)) + + +@field(parents=(strfield,)) +def datetimefield(value: str) -> Union[datetime.datetime, Error]: + # TODO should this reject naive datetimes? or assume a timezone? + try: + # TODO should we use ciso8601 to parse? It's a bit stricter, but much faster. + return parser.parse(value) + except ValueError: + return Error(msg="Could not parse datetime.") + + +def boolfield(value: Any) -> Union[bool, Error]: + if not isinstance(value, bool): + return Error(msg="Value is not a boolean.") + return value + + +def urlfield( + require_tld=True, + default_scheme=None, + allowed_schemes=None, + disallowed_schemes=None, +) -> Field: + def normalize_scheme(sch): + if sch.endswith("://") or sch.endswith(":"): + return sch + return sch + "://" + + # FQDN validation similar to https://github.com/chriso/validator.js/blob/master/src/lib/isFQDN.js + + # ff01-ff5f -> full-width chars, not allowed + alpha_numeric_and_symbols_ranges = "0-9a-z\u00a1-\uff00\uff5f-\uffff" + + tld_part = ( + require_tld + and r"\.[%s-]{2,63}" % alpha_numeric_and_symbols_ranges + or "" + ) + scheme_part = "[a-z]+://" + if default_scheme: + default_scheme = normalize_scheme(default_scheme) + scheme_regex = re.compile("^" + scheme_part, re.IGNORECASE) + if default_scheme: + scheme_part = "(%s)?" % scheme_part + regex = ( + r"^%s([-%s@:%%_+.~#?&/\\=]{1,256}%s|([0-9]{1,3}\.){3}[0-9]{1,3})(:[0-9]+)?([/?].*)?$" + % (scheme_part, alpha_numeric_and_symbols_ranges, tld_part) + ) + regex_flags = re.IGNORECASE | re.UNICODE + + def compile_schemes_to_regexes(schemes): + return [ + re.compile("^" + normalize_scheme(sch) + ".*", re.IGNORECASE) + for sch in schemes + ] + + allowed_schemes = allowed_schemes or [] + allowed_schemes_regexes = compile_schemes_to_regexes(allowed_schemes) + + disallowed_schemes = disallowed_schemes or [] + disallowed_schemes_regexes = compile_schemes_to_regexes(disallowed_schemes) + + @field(parents=(regexfield(regex=regex, flags=regex_flags),)) + def _urlfield(value: str) -> Union[Error, str]: + if not scheme_regex.match(value): + value = default_scheme + value + + if allowed_schemes: + if not any( + allowed_regex.match(value) + for allowed_regex in allowed_schemes_regexes + ): + allowed_schemes_text = " or ".join(allowed_schemes) + return Error( + msg=( + "This URL uses a scheme that's not allowed. You can only " + f"use {allowed_schemes_text}." + ) + ) + + if disallowed_schemes: + if any( + disallowed_regex.match(value) + for disallowed_regex in disallowed_schemes_regexes + ): + return Error(msg="This URL uses a scheme that's not allowed.") + + return value + + return _urlfield + + +def emailfield(max_length=254) -> Field: + email_regex = ( + r'^(?:[^\.@\s]|[^\.@\s]\.(?!\.))*[^.@\s]@' + r'[^.@\s](?:[^\.@\s]|\.(?!\.))*\.[a-z]{2,63}$' + ) + regex_flags = re.IGNORECASE + + def _email_field(value: str) -> str: + # trim any leading/trailing whitespace before validating the email + ret = value.strip() + + # only allow up to max_length + if len(ret) > max_length: + return Error(f"Email exceeds max length of {max_length}") + + return ret + + return field( + noop, + parents=( + strfield, + _email_field, + regexfield(regex=email_regex, flags=regex_flags), + ), + ) + + +FIELD_TYPE_MAP = { + int: intfield, + str: strfield, + bool: boolfield, + datetime.datetime: datetimefield, +} + +# TODO +# email +# dict? Should we should even support these? +# trimmedstring? diff --git a/cleancat/chausie/schema.py b/cleancat/chausie/schema.py new file mode 100644 index 0000000..7d97ed8 --- /dev/null +++ b/cleancat/chausie/schema.py @@ -0,0 +1,175 @@ +from typing import ClassVar, Optional, Dict, TypeVar, Type, Any, Union, Generic + +try: + from typing import get_args, get_origin +except ImportError: + # python 3.7 + from typing_extensions import get_args, get_origin + +from cleancat.chausie.consts import empty +from cleancat.chausie.field import ( + Nullability, + Required, + field, + Field, + FIELD_TYPE_MAP, + ValidationError, + Optional as CCOptional, + listfield, +) +from cleancat.chausie.schema_definition import ( + SchemaDefinition, + clean, + serialize, +) +from cleancat.base import Field as OldCleanCatField + + +def field_def_from_annotation(annotation) -> Optional[Field]: + """Turn an annotation into an equivalent field. + + Explicitly ignores `ClassVar` annotations, returning None. + """ + if annotation in FIELD_TYPE_MAP: + return field(FIELD_TYPE_MAP[annotation]) + elif get_origin(annotation) is Union: + # basic support for `Optional` + union_of = get_args(annotation) + if not (len(union_of) == 2 and type(None) in union_of): + raise TypeError("Unrecognized type annotation.") + + # yes, we actually do want to check against type(xx) + NoneType = type(None) + inner = next(t for t in get_args(annotation) if t is not NoneType) + if inner in FIELD_TYPE_MAP: + return field( + FIELD_TYPE_MAP[inner], + nullability=CCOptional(), + ) + elif get_origin(annotation) is list: + list_of = get_args(annotation) + if len(list_of) != 1: + raise TypeError("Only one inner List type is currently supported.") + inner_field_def = field_def_from_annotation(list_of[0]) + assert inner_field_def + return field(listfield(inner_field_def)) + elif get_origin(annotation) is ClassVar: + # just ignore these, these don't have to become fields + return None + + raise TypeError("Unrecognized type annotation.") + + +def _field_def_from_old_field(f: OldCleanCatField) -> Field: + nullability: Nullability + if f.required: + nullability = Required() + else: + nullability = CCOptional(omitted_value=f.blank_value) + + return field(f.clean, serialize_func=f.serialize, nullability=nullability) + + +def _check_for_dependency_loops(fields: Dict[str, Field]) -> None: + """Try to catch simple top-level dependency loops. + + Does not handle wrapped fields. + """ + deps = {name: set(f_def.depends_on) for name, f_def in fields.items()} + seen = {"self"} + while deps: + prog = len(seen) + for f_name, f_deps in deps.items(): + if not f_deps or all([f_dep in seen for f_dep in f_deps]): + seen.add(f_name) + deps.pop(f_name) + break + + if len(seen) == prog: + # no progress was made + raise ValueError( + "Field dependencies could not be resolved. " + f"Seen fields: {seen}; Remaining Deps: {deps}" + ) + + +class SchemaMetaclass(type): + def __new__(cls, clsname, bases, attribs, autodef=True): + """ + Turn a Schema subclass into a schema. + + Args: + autodef: automatically define simple fields for annotated attributes + """ + fields = {} + for base in bases: + # can't directly check for Schema class, since sometimes it hasn't + # been created yet + base_schema_def = getattr(base, "_schema_definition", None) + if isinstance(base_schema_def, SchemaDefinition): + fields.update(base_schema_def.fields) + fields.update( + { + f_name: f + for f_name, f in attribs.items() + if isinstance(f, Field) + } + ) + + # look for fields from the old cleancat schema + fields.update( + { + f_name: _field_def_from_old_field(f) + for f_name, f in attribs.items() + if f_name not in fields and isinstance(f, OldCleanCatField) + } + ) + + if autodef: + for f_name, f_type in attribs.get("__annotations__", {}).items(): + if f_name not in fields: + field_def = field_def_from_annotation(f_type) + if field_def: + fields[f_name] = field_def + + # check for dependency loops + _check_for_dependency_loops(fields) + + schema_def = SchemaDefinition(fields=fields) + return super(SchemaMetaclass, cls).__new__( + cls, clsname, bases, {**attribs, "_schema_definition": schema_def} + ) + + +T = TypeVar("T", bound="Schema") +SchemaVar = TypeVar("SchemaVar", bound="Schema") + + +class Schema(Generic[T], metaclass=SchemaMetaclass): + _schema_definition: ClassVar[SchemaDefinition] + + def __init__(self, **kwargs): + defined_fields = self._schema_definition.fields + for k, v in kwargs.items(): + if k not in defined_fields: + continue + setattr(self, k, v) + + @classmethod + def clean( + cls: Type[SchemaVar], data: Any, context: Any = empty + ) -> Union[SchemaVar, ValidationError]: + result = clean(cls._schema_definition, data, context) + if isinstance(result, ValidationError): + return result + else: + return cls(**result) + + def serialize(self) -> Dict: + return serialize( + self._schema_definition, + { + n: getattr(self, n) + for n in self._schema_definition.fields.keys() + }, + ) diff --git a/cleancat/chausie/schema_definition.py b/cleancat/chausie/schema_definition.py new file mode 100644 index 0000000..162c407 --- /dev/null +++ b/cleancat/chausie/schema_definition.py @@ -0,0 +1,110 @@ +import itertools +import typing +from typing import Dict, Any, Union + +import attr + +from cleancat.chausie.consts import empty, omitted +from cleancat.chausie.field import Field, ValidationError, Value, Errors + + +@attr.frozen +class SchemaDefinition: + fields: Dict[str, Field] + + +def clean( + schema_definition: SchemaDefinition, data: Any, context: Any = empty +) -> Union[Dict[str, Any], ValidationError]: + """Entrypoint for cleaning some set of data for a given schema definition.""" + field_defs = [ + (name, f_def) for name, f_def in schema_definition.fields.items() + ] + + # fake an initial 'self' result so function-defined fields can + # optionally include an unused "self" parameter + results: Dict[str, Union[Value, Errors]] = {"self": Value(value=None)} + + # initial set are those with met deps + eval_queue: typing.List[typing.Tuple[str, Field]] = [] + delayed_eval = [] + for name, f in field_defs: + if not f.depends_on or all([d in results for d in f.depends_on]): + eval_queue.append((name, f)) + else: + delayed_eval.append((name, f)) + assert len(field_defs) == len(eval_queue) + len(delayed_eval) + + while eval_queue: + field_name, field_def = eval_queue.pop() + + accepts = field_def.accepts or (field_name,) + value = empty + for accept in accepts: + value = getter(data, accept, omitted) + if value is not omitted: + break + assert value is not empty + + results[field_name] = field_def.run_validators( + field=(field_name,), + value=value, + context=context, + intermediate_results=results, + ) + + queued_fields = {n for n, _f in eval_queue} + for name, f in delayed_eval: + if ( + name not in results + and name not in queued_fields + and all( + [ + (dep in results and isinstance(results[dep], Value)) + for dep in f.depends_on + ] + ) + ): + eval_queue.append((name, f)) + + errors = list( + itertools.chain( + *[ + v.flatten() + for v in results.values() + if not isinstance(v, Value) + ] + ) + ) + if errors: + return ValidationError(errors=errors) + + # we already checked for errors above, but this extra explicit check + # helps mypy figure out what's going on. + validated_values = { + k: v.value + for k, v in results.items() + if isinstance(v, Value) and k != "self" + } + assert set(validated_values.keys()) == {f_name for f_name, _ in field_defs} + return validated_values + + +def serialize( + schema_definition: SchemaDefinition, data: Dict[str, Any] +) -> Dict: + """Serialize a schema to a dictionary, respecting serialization settings.""" + result = { + (field_def.serialize_to or field_name): field_def.serialize_func( + data[field_name] + ) + for field_name, field_def in schema_definition.fields.items() + } + return {k: v for k, v in result.items() if v is not omitted} + + +def getter(dict_or_obj, field_name, default): + if isinstance(dict_or_obj, dict): + return dict_or_obj.get(field_name, default) + else: + getattr(dict_or_obj, field_name, default) diff --git a/setup.py b/setup.py index 33e96df..0c8a504 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,12 @@ import re from setuptools import setup -install_requirements = ['python-dateutil', 'pytz'] +install_requirements = [ + 'python-dateutil', + 'pytz', + 'attrs', + "typing_extensions; python_version < '3.8'", +] test_requirements = install_requirements + [ 'pytest', 'coverage', diff --git a/tests/chausie/README/__init__.py b/tests/chausie/README/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/chausie/README/test_basic.py b/tests/chausie/README/test_basic.py new file mode 100644 index 0000000..c9fdaf1 --- /dev/null +++ b/tests/chausie/README/test_basic.py @@ -0,0 +1,111 @@ +from typing import Dict, List +from cleancat.chausie.field import ( + field, + emailfield, + listfield, + urlfield, + ValidationError, +) +from cleancat.chausie.schema import Schema +import pytest + +# flask is not tested +# from flask import app, request, jsonify + + +class JobApplication(Schema): + first_name: str + last_name: str + email: str = field(emailfield()) + urls: List[str] = field(listfield(urlfield(default_scheme='http://'))) + + +# again, not testing against flask +# @app.route('/job_application', methods=['POST']) +def my_view(request_json) -> Dict: + result = JobApplication.clean(request_json) + if isinstance(result, ValidationError): + return { + 'errors': [{'msg': e.msg, 'field': e.field} for e in result.errors] + } + + # Now "result" has the validated data, in the form of a `JobApplication` instance. + assert isinstance(result, JobApplication) + name = f'{result.first_name} {result.last_name}' + return {'name': name, 'contact': result.email, 'hmu': result.urls} + + +@pytest.mark.parametrize( + 'payload,expected_result', + [ + ( + {}, + { + 'errors': [ + {'msg': 'Value is required.', 'field': ('last_name',)}, + {'msg': 'Value is required.', 'field': ('first_name',)}, + {'msg': 'Value is required.', 'field': ('urls',)}, + {'msg': 'Value is required.', 'field': ('email',)}, + ] + }, + ), + ( + { + 'first_name': None, + 'last_name': None, + 'urls': None, + 'email': None, + }, + { + 'errors': [ + { + 'msg': 'Value is required, and must not be None.', + 'field': ('last_name',), + }, + { + 'msg': 'Value is required, and must not be None.', + 'field': ('first_name',), + }, + { + 'msg': 'Value is required, and must not be None.', + 'field': ('urls',), + }, + { + 'msg': 'Value is required, and must not be None.', + 'field': ('email',), + }, + ] + }, + ), + ( + { + 'first_name': '', # empty strings are valid by default + 'last_name': '', + 'urls': ['spam'], # but that's not a real url + 'email': 'john', # and that's not a real email + }, + { + 'errors': [ + {'msg': 'Invalid input.', 'field': ('urls', 0)}, + {'msg': 'Invalid input.', 'field': ('email',)}, + ] + }, + ), + ( + { + 'first_name': 'John', + 'last_name': 'Gibbons', + 'urls': ['johngibbons.com'], + 'email': 'john@johnGibbons.com', + }, + { + 'name': 'John Gibbons', + 'contact': 'john@johnGibbons.com', + 'hmu': ['http://johngibbons.com'], + }, + ), + ], +) +def test_my_view(payload, expected_result): + actual_result = my_view(payload) + assert actual_result == expected_result diff --git a/tests/chausie/README/test_explicit_nullability.py b/tests/chausie/README/test_explicit_nullability.py new file mode 100644 index 0000000..6392cac --- /dev/null +++ b/tests/chausie/README/test_explicit_nullability.py @@ -0,0 +1,108 @@ +from typing import Optional, Union +from cleancat.chausie.consts import OMITTED +from cleancat.chausie.field import ( + field, + strfield, + Optional as CCOptional, + Required, +) +from cleancat.chausie.schema import Schema +import pytest + + +class NullabilityExample(Schema): + # auto defined based on annotations + nonnull_required: str + nullable_omittable: Optional[str] + + # manually specified + nonnull_omittable: Union[str, OMITTED] = field( + strfield, nullability=CCOptional(allow_none=False) + ) + nullable_required: Optional[str] = field( + strfield, nullability=Required(allow_none=True) + ) + + +@pytest.mark.parametrize( + 'payload,expected_result', + [ + ( + {}, + { + 'errors': [ + { + 'msg': 'Value is required.', + 'field': ('nonnull_required',), + }, + { + 'msg': 'Value is required.', + 'field': ('nullable_required',), + }, + ] + }, + ), + ( + {'nonnull_required': None, 'nullable_required': None}, + { + 'errors': [ + { + 'msg': 'Value is required, and must not be None.', + 'field': ('nonnull_required',), + } + ] + }, + ), + ( + {'nonnull_required': 'test', 'nullable_required': None}, + { + 'nullable_required': None, + 'nonnull_required': 'test', + }, + ), + ( + { + 'nonnull_required': 'test', + 'nullable_required': None, + 'nonnull_omittable': None, + }, + { + 'errors': [ + { + 'msg': 'Value must not be None.', + 'field': ('nonnull_omittable',), + } + ] + }, + ), + ( + { + 'nonnull_required': 'test', + 'nullable_required': None, + 'nonnull_omittable': 'another test', + }, + { + 'nonnull_omittable': 'another test', + 'nullable_required': None, + 'nonnull_required': 'test', + }, + ), + ( + { + 'nonnull_required': 'test', + 'nullable_required': None, + 'nonnull_omittable': 'another test', + 'nullable_omittable': None, + }, + { + 'nonnull_required': 'test', + 'nullable_required': None, + 'nonnull_omittable': 'another test', + 'nullable_omittable': None, + }, + ), + ], +) +def test_nullability_example(payload, expected_result): + actual_result = NullabilityExample.clean(payload).serialize() + assert actual_result == expected_result diff --git a/tests/chausie/__init__.py b/tests/chausie/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/chausie/ext/test_attr.py b/tests/chausie/ext/test_attr.py new file mode 100644 index 0000000..27260d4 --- /dev/null +++ b/tests/chausie/ext/test_attr.py @@ -0,0 +1,94 @@ +import attr + +from cleancat.chausie.ext.attrs import ( + schema_def_from_attrs_class, + schema_for_attrs_class, +) +from cleancat.chausie.field import Error, ValidationError +from cleancat.chausie.schema_definition import SchemaDefinition, clean + + +def test_basic(): + @attr.frozen() + class AnnotatedValue: + value: int + unit: str + + annotated_value_schema_def: SchemaDefinition = schema_def_from_attrs_class( + AnnotatedValue + ) + result = clean( + annotated_value_schema_def, data={'value': 10, 'unit': 'inches'} + ) + assert isinstance(result, dict) + assert result['value'] == 10 + assert result['unit'] == 'inches' + + +def test_implicit_validators(): + @attr.frozen() + class AnnotatedValue: + value: int + unit: str + + annotated_value_schema_def: SchemaDefinition = schema_def_from_attrs_class( + AnnotatedValue + ) + + # parses str -> int + result = clean( + annotated_value_schema_def, data={'value': '10', 'unit': 'inches'} + ) + assert isinstance(result, dict) + assert result['value'] == 10 + assert result['unit'] == 'inches' + + # bad string results in an error + result = clean( + annotated_value_schema_def, data={'value': 'ten', 'unit': 'inches'} + ) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(field=('value',), msg='Unable to parse int from given string.') + ] + + +def test_attr_validators(): + @attr.frozen() + class AnnotatedValue: + value: int = attr.attrib(validator=(attr.validators.instance_of(int))) + unit: str + + annotated_value_schema_def: SchemaDefinition = schema_def_from_attrs_class( + AnnotatedValue + ) + + # parses str -> int + result = clean( + annotated_value_schema_def, data={'value': '10', 'unit': 'inches'} + ) + assert isinstance(result, dict) + assert result['value'] == 10 + assert result['unit'] == 'inches' + + # bad string results in an error + result = clean( + annotated_value_schema_def, data={'value': 'ten', 'unit': 'inches'} + ) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(field=('value',), msg='Unable to parse int from given string.') + ] + + +def test_schema_from_attrs_class(): + @attr.frozen() + class AnnotatedValue: + value: int + unit: str + + AnnotatedValueSchema = schema_for_attrs_class(AnnotatedValue) + result = AnnotatedValueSchema.clean(data={'value': '10', 'unit': 'inches'}) + assert isinstance(result, AnnotatedValue) + assert result.value == 10 + assert result.unit == 'inches' diff --git a/tests/chausie/test_complex.py b/tests/chausie/test_complex.py new file mode 100644 index 0000000..992ea9b --- /dev/null +++ b/tests/chausie/test_complex.py @@ -0,0 +1,175 @@ +from typing import List, Union, Optional + +import attr + +from cleancat.chausie.consts import omitted +from cleancat.chausie.field import Error, strfield, field, noop +from cleancat.chausie.schema import Schema + + +def test_reusable_fields(): + @attr.frozen + class User: + pk: str + name: str + active: bool = True + + @attr.frozen + class Organization: + pk: str + name: str + members: List[str] + + @attr.frozen + class Lead: + pk: str + org_id: str + name: str + website: str + + billy = User(pk="user_billy", name="Billy") + joe = User(pk="user_joe", name="Joe") + charlie = User(pk="user_charlie", name="Charlie") + dave = User(pk="user_dave", name="Dave", active=False) + + org_a = Organization( + pk="orga_a", + name="Organization A", + members=[billy.pk, dave.pk], + ) + org_b = Organization( + pk="orga_b", + name="Organization B", + members=[billy.pk, joe.pk], + ) + + ibm = Lead(pk="lead_ibm", name="IBM", website="ibm.com", org_id=org_a.pk) + + class UserRepo: + USERS_BY_PK = {u.pk: u for u in [billy, joe, charlie]} + + def get_by_pk(self, pk): + return self.USERS_BY_PK.get(pk, None) + + class OrganizationRepo: + ORGANIZATIONS_BY_PK = {o.pk: o for o in [org_a, org_b]} + + def get_by_pk(self, pk): + return self.ORGANIZATIONS_BY_PK.get(pk, None) + + class LeadRepo: + LEADS_BY_PK = {lead.pk: lead for lead in [ibm]} + + def get_by_pk(self, pk): + return self.LEADS_BY_PK.get(pk, None) + + def add(self, lead): + self.LEADS_BY_PK[lead.pk] = lead + + @attr.frozen + class Context: + current_user: User + user_repo: UserRepo + org_repo: OrganizationRepo + lead_repo: LeadRepo + + def lookup_org(value: str, context: Context) -> Union[Organization, Error]: + org = context.org_repo.get_by_pk(value) + if org: + return org + + return Error(msg="Organization not found.") + + def validate_org_visibility( + value: Organization, context: Context + ) -> Union[Organization, Error]: + if not context.current_user.active: + # probably would want to do this when constructing the context + return Error(msg="User is not active.") + + if context.current_user.pk not in value.members: + return Error(msg="User cannot access organization.") + return value + + class UpdateLeadRestSchema(Schema): + pk: str + name: Optional[str] + website: str + organization: str + + class UpdateLead(Schema): + name: Optional[str] + website: Optional[str] + organization: Organization = field( + noop, + parents=(lookup_org, validate_org_visibility), + ) + + @field(parents=(strfield,), accepts=("pk",)) + def obj( + value: str, context: Context, organization: Organization + ) -> Union[Lead, Error]: + lead = context.lead_repo.get_by_pk(pk=value) + if lead.org_id != organization.pk: + return Error(msg="Lead not found.") + return lead + + # service function + def update_lead_as_user(pk, name, website, org_id, as_user_id): + user_repo = UserRepo() + lead_repo = LeadRepo() + context = Context( + current_user=user_repo.get_by_pk(as_user_id), + org_repo=OrganizationRepo(), + user_repo=user_repo, + lead_repo=lead_repo, + ) + spec = UpdateLead.clean( + data={ + "pk": pk, + "name": name, + "website": website, + "organization": org_id, + }, + context=context, + ) + if isinstance(spec, Error): + raise ValueError(",".join([e.msg for e in spec.errors])) + + changes = { + field: getattr(spec, field) + for field in ["name", "website"] + if getattr(spec, field) is not omitted + } + lead = attr.evolve(spec.obj, **changes) + lead_repo.add(lead) + return lead + + result = UpdateLeadRestSchema.clean( + data={ + "pk": "lead_ibm", + "organization": "orga_a", + "website": "newibm.com", + }, + ) + assert isinstance(result, UpdateLeadRestSchema) + assert result.pk == "lead_ibm" + assert result.organization == org_a.pk + assert result.name is omitted + assert result.website == "newibm.com" + + new_lead = update_lead_as_user( + pk=result.pk, + org_id=result.organization, + name=result.name, + website=result.website, + as_user_id=billy.pk, + ) + assert new_lead.pk == "lead_ibm" + assert new_lead.name == "IBM" + assert new_lead.website == "newibm.com" + + # was actually persisted with the lead repo + refetched_lead = LeadRepo().get_by_pk(new_lead.pk) + assert refetched_lead.pk == new_lead.pk + assert refetched_lead.website == new_lead.website diff --git a/tests/chausie/test_field.py b/tests/chausie/test_field.py new file mode 100644 index 0000000..edf21fc --- /dev/null +++ b/tests/chausie/test_field.py @@ -0,0 +1,545 @@ +import datetime +import enum +from typing import Union, Optional, List, Set + +import attr +import pytest + +from cleancat.chausie.consts import omitted +from cleancat.chausie.field import ( + intfield, + Error, + field, + ValidationError, + Optional as CCOptional, + strfield, + listfield, + nestedfield, + enumfield, + regexfield, + urlfield, +) +from cleancat.chausie.schema import Schema + + +@pytest.fixture +def example_schema(): + class ExampleSchema(Schema): + myint = field(intfield, accepts=("myint", "deprecated_int")) + + @field(parents=(intfield,)) + def mylowint(value: int) -> Union[int, Error]: + if value < 5: + return value + else: + return Error(msg="Needs to be less than 5") + + return ExampleSchema + + +def test_basic_happy_path(example_schema): + test_data = {"myint": 100, "mylowint": 2} + result = example_schema.clean(test_data) + assert isinstance(result, example_schema) + assert result.myint == test_data["myint"] + assert result.mylowint == test_data["mylowint"] + + assert test_data == result.serialize() + + +def test_basic_validation_error(example_schema): + test_data = {"myint": 100, "mylowint": 10} + result = example_schema.clean(test_data) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Needs to be less than 5", field=("mylowint",)) + ] + + +def test_accepts(example_schema): + test_data = {"deprecated_int": 100, "mylowint": 2} + result = example_schema.clean(test_data) + assert isinstance(result, example_schema) + assert result.myint == test_data["deprecated_int"] + + assert result.serialize() == { + "myint": test_data["deprecated_int"], + "mylowint": 2, + } + + +def test_serialize_to(): + class MySchema(Schema): + myint = field(intfield, serialize_to="my_new_int") + + result = MySchema.clean({"myint": 100}) + assert isinstance(result, MySchema) + assert result.myint == 100 + assert result.serialize() == {"my_new_int": 100} + + +def test_serialize_func(): + def double(value): + return value * 2 + + class MySchema(Schema): + myint = field(intfield, serialize_func=double) + + result = MySchema.clean({"myint": 100}) + assert isinstance(result, MySchema) + assert result.myint == 100 + assert result.serialize() == {"myint": 200} + + +def test_intfield(): + class MySchema(Schema): + val: int = field(intfield) + + result = MySchema.clean({"val": 5}) + assert isinstance(result, MySchema) + assert result.val == 5 + + +def test_strfield(): + class UserSchema(Schema): + name: str + + result = UserSchema.clean({"name": "John"}) + assert isinstance(result, UserSchema) + assert result.name == "John" + + +def test_boolfield(): + class UserSchema(Schema): + active: bool + + result = UserSchema.clean({"active": True}) + assert isinstance(result, UserSchema) + assert result.active is True + + +class TestListField: + def test_listfield_basic(self): + class UserSchema(Schema): + aliases = field(listfield(field(strfield))) + + result = UserSchema.clean({"aliases": ["John", "Johnny"]}) + assert isinstance(result, UserSchema) + assert result.aliases == ["John", "Johnny"] + + def test_listfield_empty(self): + class UserSchema(Schema): + aliases = field(listfield(field(strfield))) + + result = UserSchema.clean({"aliases": ["John", "Johnny"]}) + assert isinstance(result, UserSchema) + assert result.aliases == ["John", "Johnny"] + + def test_listfield_inner_optional(self): + class UserSchema(Schema): + aliases = field( + listfield(field(strfield, nullability=CCOptional())) + ) + + result = UserSchema.clean({"aliases": ["John", None]}) + assert isinstance(result, UserSchema) + assert result.aliases == ["John", None] + + def test_listfield_chained(self): + @attr.frozen + class Alias: + value: str + + class UserSchema(Schema): + @field(parents=(listfield(field(strfield)),)) + def aliases(value: List[str]) -> List[Alias]: + return [Alias(v) for v in value] + + result = UserSchema.clean({"aliases": ["John", "Johnny"]}) + assert isinstance(result, UserSchema) + assert result.aliases == [Alias(value="John"), Alias(value="Johnny")] + + def test_listfield_parent_context(self): + @attr.frozen + class Context: + valid_suffixes: Set[str] + + def validate_suffix(value: str, context: Context): + if value not in context.valid_suffixes: + return Error(msg="Suffix is invalid") + return value + + class UserSchema(Schema): + suffixes = field( + listfield(field(validate_suffix, parents=(strfield,))) + ) + + context_ = Context(valid_suffixes={"Sr", "Jr", "2nd"}) + result = UserSchema.clean({"suffixes": ["Sr", "Jr"]}, context=context_) + assert isinstance(result, UserSchema) + assert result.suffixes == ["Sr", "Jr"] + + def test_listfield_context(self): + @attr.frozen + class Context: + valid_suffixes: Set[str] + + class UserSchema(Schema): + @field(parents=(listfield(field(strfield)),)) + def suffixes( + value: List[str], context: Context + ) -> Union[List[str], Error]: + for suffix in value: + if suffix not in context.valid_suffixes: + return Error(msg="Suffix is invalid") + return value + + context_ = Context(valid_suffixes={"Sr", "Jr", "2nd"}) + result = UserSchema.clean({"suffixes": ["Sr", "Jr"]}, context=context_) + assert isinstance(result, UserSchema) + assert result.suffixes == ["Sr", "Jr"] + + +class TestRegexField: + def test_basic(self): + class UserSchema(Schema): + initials: str = regexfield(r"[A-Z]{2}") + + result = UserSchema.clean({"initials": "AA"}) + assert isinstance(result, UserSchema) + assert result.initials == "AA" + + def test_no_match(self): + class UserSchema(Schema): + initials: str = regexfield(r"[A-Z]{2}") + + result = UserSchema.clean({"initials": "A"}) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Invalid input.", field=("initials",)) + ] + + +class TestDatetimeField: + def test_basic(self): + class UserSchema(Schema): + birthday: datetime.datetime + + result = UserSchema.clean({"birthday": "2000-01-01T4:00:00Z"}) + assert isinstance(result, UserSchema) + assert result.birthday == datetime.datetime( + 2000, 1, 1, 4, tzinfo=datetime.timezone.utc + ) + + def test_no_match(self): + class UserSchema(Schema): + birthday: datetime.datetime + + result = UserSchema.clean({"birthday": "nonsense"}) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Could not parse datetime.", field=("birthday",)) + ] + + +class TestNullability: + def test_required_omitted(self): + class MySchema(Schema): + myint: int + + result = MySchema.clean({}) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Value is required.", field=("myint",)) + ] + + def test_required_none(self): + class MySchema(Schema): + myint: int + + result = MySchema.clean({"myint": None}) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error( + msg="Value is required, and must not be None.", + field=("myint",), + ) + ] + + def test_optional_omitted(self): + class MySchema(Schema): + myint: Optional[int] + + result = MySchema.clean({}) + assert isinstance(result, MySchema) + assert result.myint is omitted + + def test_optional_none(self): + class MySchema(Schema): + myint: Optional[int] + + result = MySchema.clean({"myint": None}) + assert isinstance(result, MySchema) + assert result.myint is None + + +class TestNestedField: + def test_nestedfield_basic(self): + class InnerSchema(Schema): + a: str + + class OuterSchema(Schema): + inner = field(nestedfield(InnerSchema)) + + result = OuterSchema.clean({"inner": {"a": "John"}}) + assert isinstance(result, OuterSchema) + assert isinstance(result.inner, InnerSchema) + assert result.inner.a == "John" + + def test_nestedfield_with_context(self): + @attr.frozen + class Context: + curr_user_id: str + + class InnerSchema(Schema): + @field(parents=(strfield,)) + def a(value: str, context: Context) -> str: + return f"{value}:{context.curr_user_id}" + + class OuterSchema(Schema): + inner = field(nestedfield(InnerSchema)) + + result = OuterSchema.clean( + {"inner": {"a": "John"}}, context=Context(curr_user_id="user_abc") + ) + assert isinstance(result, OuterSchema) + assert isinstance(result.inner, InnerSchema) + assert result.inner.a == "John:user_abc" + + +class TestEnumField: + def test_enumfield_basic(self): + class Color(enum.Enum): + BLUE = "blue" + RED = "red" + GREEN = "green" + + class MySchema(Schema): + color = field(enumfield(Color)) + + result = MySchema.clean({"color": "blue"}) + assert isinstance(result, MySchema) + assert isinstance(result.color, Color) + assert result.color is Color.BLUE + + @pytest.mark.parametrize("bad_value", ["black", 5, object()]) + def test_enumfield_error(self, bad_value): + class Color(enum.Enum): + BLUE = "blue" + RED = "red" + GREEN = "green" + + class MySchema(Schema): + color = field(enumfield(Color)) + + result = MySchema.clean({"color": bad_value}) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Invalid value for enum.", field=("color",)) + ] + + +def test_field_self(): + class AliasSchema(Schema): + @field(parents=(strfield,)) + def value(self, value: str): + return f"Value:{value}" + + result = AliasSchema.clean({"value": "John"}) + assert isinstance(result, AliasSchema) + assert result.value == "Value:John" + + +def test_extendable_fields(): + # we should be able to define reusable/composable fields with their own parents + # TODO should this be a different function that makes it clearer this only applies parents as validators? + @field(parents=(strfield,)) + def valuefield(value: str): + return f"Value:{value}" + + class MySchema(Schema): + @field(parents=(valuefield,)) + def a(self, value: str): + return f"a:{value}" + + result = MySchema.clean({"a": "John"}) + assert isinstance(result, MySchema) + assert result.a == "a:Value:John" + + +class TestURLField: + @pytest.mark.parametrize( + "value", + [ + "http://x.com", + "http://♡.com", + "http://example.com/a?b=c", + "ftp://ftp.example.com", + "http://example.com?params=without&path", + # Russian unicode URL (IDN, unicode path and query params) + "http://пример.com", + "http://пример.рф", + "http://пример.рф/путь/?параметр=значение", + # Punicode stuff + "http://test.XN--11B4C3D", + # http://stackoverflow.com/questions/9238640/how-long-can-a-tld-possibly-be + # Longest to date (Feb 2017) TLD in punicode format is 24 chars long + "http://test.xn--vermgensberatung-pwb", + ], + ) + def test_in_accepts_valid_urls(self, value): + class MyUrlSchema(Schema): + url = field(urlfield()) + + result = MyUrlSchema.clean({"url": value}) + assert isinstance(result, MyUrlSchema) + assert result.url == value + + @pytest.mark.parametrize( + "value", + [ + "www.example.com", + "http:// invalid.com", + "http://!nvalid.com", + "http://.com", + "http://", + "http://.", + "invalid", + "http://GOOGLE.com", # full-width chars are disallowed + "javascript:alert()", # TODO "javascript" is a valid scheme. "//" is not a part of some URIs. + ], + ) + def test_it_rejects_invalid_urls(self, value): + class MyUrlSchema(Schema): + url = field(urlfield()) + + result = MyUrlSchema.clean({"url": value}) + assert isinstance(result, ValidationError) + assert result.errors == [Error(msg="Invalid input.", field=("url",))] + + @pytest.mark.parametrize( + "value, expected", + [ + ("http://example.com/a?b=c", "http://example.com/a?b=c"), + ("ftp://ftp.example.com", "ftp://ftp.example.com"), + ("www.example.com", "http://www.example.com"), + ("invalid", None), + ], + ) + def test_it_supports_a_default_scheme(self, value, expected): + class MyUrlSchema(Schema): + url = field(urlfield(default_scheme="http://")) + + result = MyUrlSchema.clean({"url": value}) + if expected: + assert isinstance(result, MyUrlSchema) + assert result.url == expected + else: + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Invalid input.", field=("url",)) + ] + + @pytest.mark.parametrize( + "value, expected", + [ + ("https://example.com/", "https://example.com/"), + ("example.com/", "https://example.com/"), + ("http://example.com", None), + ], + ) + def test_it_enforces_allowed_schemes(self, value, expected): + class MyUrlSchema(Schema): + url = field( + urlfield( + default_scheme="https://", allowed_schemes=["https://"] + ) + ) + + result = MyUrlSchema.clean({"url": value}) + if expected: + assert isinstance(result, MyUrlSchema) + assert result.url == expected + else: + expected_err_msg = ( + "This URL uses a scheme that's not allowed. You can only " + "use https://." + ) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg=expected_err_msg, field=("url",)) + ] + + @pytest.mark.parametrize( + "value, expected", + [ + ("https://example.com/", "https://example.com/"), + ("ftp://ftp.example.com", "ftp://ftp.example.com"), + ("example.com/", "https://example.com/"), + ("javascript://www.example.com/#%0aalert(document.cookie)", None), + ], + ) + def test_it_enforces_disallowed_schemes(self, value, expected): + class MyUrlSchema(Schema): + url = field( + urlfield( + default_scheme="https://", + disallowed_schemes=["javascript:"], + ) + ) + + result = MyUrlSchema.clean({"url": value}) + if expected: + assert isinstance(result, MyUrlSchema) + assert result.url == expected + else: + assert isinstance(result, ValidationError) + assert result.errors == [ + Error( + msg="This URL uses a scheme that's not allowed.", + field=("url",), + ) + ] + + @pytest.mark.parametrize( + "value, expected", + [ + ("https://example.com/", "https://example.com/"), + ("example.com/", "https://example.com/"), + ("ftps://storage.example.com", "ftps://storage.example.com"), + ], + ) + def test_it_supports_simpler_allowed_scheme_values(self, value, expected): + class MyUrlSchema(Schema): + url = field( + urlfield( + default_scheme="https", allowed_schemes=["https", "ftps"] + ) + ) + + result = MyUrlSchema.clean({"url": value}) + assert isinstance(result, MyUrlSchema) + assert result.url == expected + + @pytest.mark.parametrize("value", [23.0, True]) + def test_it_enforces_valid_data_type(self, value): + class MyUrlSchema(Schema): + url = field( + urlfield( + default_scheme="https", allowed_schemes=["https", "ftps"] + ) + ) + + result = MyUrlSchema.clean({"url": value}) + assert isinstance(result, ValidationError) + assert result.errors == [Error(msg="Unhandled type", field=("url",))] diff --git a/tests/chausie/test_schema.py b/tests/chausie/test_schema.py new file mode 100644 index 0000000..83648ef --- /dev/null +++ b/tests/chausie/test_schema.py @@ -0,0 +1,175 @@ +from typing import Optional, Union, List + +import attr +import pytest +from cleancat.base import Bool, Integer, String, List as OldCCList + +from cleancat.chausie.consts import omitted +from cleancat.chausie.schema import Schema +from cleancat.chausie.field import ( + field, + Error, + ValidationError, + strfield, +) + + +class TestAutodef: + def test_int_basic(self): + class MySchema(Schema): + myint: int + + result = MySchema.clean({"myint": 100}) + assert isinstance(result, MySchema) + assert result.myint == 100 + assert result.serialize() == {"myint": 100} + + def test_optional_omitted(self): + class MySchema(Schema): + myint: Optional[int] + + result = MySchema.clean({}) + assert isinstance(result, MySchema) + assert result.myint is omitted + + def test_optional_none(self): + class MySchema(Schema): + myint: Optional[int] + + result = MySchema.clean({"myint": None}) + assert isinstance(result, MySchema) + assert result.myint is None + + def test_list(self): + class MySchema(Schema): + mystrs: List[str] + + result = MySchema.clean({"mystrs": ["a", "b", "c"]}) + assert isinstance(result, MySchema) + assert result.mystrs == ["a", "b", "c"] + + +def test_field_dependencies(): + @attr.frozen + class B: + val: str + + class UpdateObject(Schema): + a: str + + @field() + def b(a: str) -> B: + return B(val=a) + + result = UpdateObject.clean(data={"a": "A"}) + assert isinstance(result, UpdateObject) + assert result.a == "A" + assert result.b == B(val="A") + + +def test_field_dependencies_error(): + @attr.frozen + class B: + val: str + + class UpdateObject(Schema): + @field() + def a(value: str) -> Union[str, Error]: + return Error(msg="nope") + + @field() + def b(a: str) -> B: + return B(val=a) + + result = UpdateObject.clean(data={"a": "A"}) + assert isinstance(result, ValidationError) + assert result.errors == [Error(msg="nope", field=("a",))] + + +def test_context(): + @attr.frozen + class Organization: + pk: str + name: str + + org_a = Organization(pk="orga_a", name="Organization A") + org_b = Organization(pk="orga_b", name="Organization B") + + class OrganizationRepo: + ORGANIZATIONS_BY_PK = {o.pk: o for o in [org_a, org_b]} + + def get_by_pk(self, pk): + return self.ORGANIZATIONS_BY_PK.get(pk, None) + + @attr.frozen + class Context: + org_repo: OrganizationRepo + + class UserSchema(Schema): + name: str + + @field(parents=(strfield,)) + def organization( + value: str, context: Context + ) -> Union[Organization, Error]: + org = context.org_repo.get_by_pk(value) + if org: + return org + + return Error(msg="Organization not found.") + + context = Context(org_repo=OrganizationRepo()) + result = UserSchema.clean( + data={"name": "John", "organization": "orga_a"}, context=context + ) + assert isinstance(result, UserSchema) + assert result.name == "John" + assert result.organization == org_a + + result = UserSchema.clean( + data={"name": "John", "organization": "orga_c"}, context=context + ) + assert isinstance(result, ValidationError) + assert result.errors == [ + Error(msg="Organization not found.", field=("organization",)) + ] + + with pytest.raises(ValueError): + # no context given, ths schema needs a context + UserSchema.clean(data={"name": "John", "organization": "orga_a"}) + + +def test_def_using_old_fields(): + class MySchema(Schema): + # base fields + mystring: str = String() + mybool: bool = Bool() + myint: int = Integer() + mylist: List[str] = OldCCList(String()) + + # nullable fields + nullstring: Optional[str] = String(required=False) + omittedstring: Optional[str] = String(required=False) + + # old fields can be inter-mixed with new-style fields + other_string: str + + result = MySchema.clean( + data={ + "mystring": "asdf", + "mybool": True, + "myint": 10, + "mylist": ["asdf"], + "nullstring": None, + # omittedstring isn't present + "other_string": "the other string", + } + ) + assert isinstance(result, MySchema) + assert result.mystring == "asdf" + assert result.mybool is True + assert result.myint == 10 + assert result.mylist == ["asdf"] + assert result.nullstring is None + assert result.omittedstring == "" + assert result.other_string == "the other string"