Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature constraints v2 #262

Closed
wants to merge 12 commits into from
15 changes: 12 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.

### Unreleased

### Changed
* Modified data generator to allow specification of constraints to the data generation process

### Added
* Added classes for constraints on the data generation via new package `dbldatagen.constraints`


### Version 0.3.6 Post 1

Expand All @@ -13,7 +21,6 @@ All notable changes to the Databricks Labs Data Generator will be documented in
#### Fixed
* Fixed scenario where `DataAnalyzer` is used on dataframe containing a column named `summary`


### Version 0.3.6

#### Changed
Expand All @@ -25,6 +32,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in
* Ths version marks the changing minimum version of Databricks runtime to 10.4 LTS and later releases.
* While there are no known incompatibilities with Databricks 9.1 LTS, we will not test against this release


### Version 0.3.5

#### Changed
Expand All @@ -33,6 +41,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in
* Added ``withStructColumn`` method to allow simplified generation of struct and JSON columns
* Modified pipfile to use newer version of package specifications


### Version 0.3.4 Post 3

### Changed
Expand All @@ -44,7 +53,6 @@ All notable changes to the Databricks Labs Data Generator will be documented in
* Fix for use of values in columns of type array, map and struct
* Fix for generation of arrays via `numFeatures` and `structType` attributes when numFeatures has value of 1


### Version 0.3.4 Post 1

### Fixed
Expand Down Expand Up @@ -116,6 +124,7 @@ Thanks to Marvin Schenkel for the contribution
#### Notes
* column definitions for map, struct or array must use `expr` attribute to initialize field. Defaults to `NULL`


### Version 0.3.0

#### Changes
Expand All @@ -127,6 +136,7 @@ Thanks to Marvin Schenkel for the contribution
* Updated to Spark 3.2.1 or later
* Unit test updates - migration from `unittest` to `pytest` for many tests


### Version 0.2.1

#### Features
Expand All @@ -150,7 +160,6 @@ Thanks to Marvin Schenkel for the contribution
* Use of data generator to generate static and streaming data sources in Databricks Delta Live Tables
* added support for install from PyPi


### General Requirements

See the contents of the file `python/require.txt` to see the Python package dependencies
Expand Down
40 changes: 40 additions & 0 deletions dbldatagen/constraints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This package defines the constraints classes for the `dbldatagen` library.

The constraints classes are used to define predefined constraints that may be used to constrain the generated data.

Constraining the generated data is implemented in several ways:

- Rejection of rows that do not meet the criteria
- Modifying the generated data to meet the constraint (including modifying the data generation parameters)

Some constraints may be implemented using a combination of the above approaches.

For implementations using the rejection approach, the data generation process will possibly generate less than the
requested number of rows.

For the current implementation, most of the constraint strategies will be implemented using rejection based criteria.
"""

from .chained_relation import ChainedRelation
from .constraint import Constraint
from .literal_range_constraint import LiteralRange
from .literal_relation_constraint import LiteralRelation
from .negative_values import NegativeValues
from .positive_values import PositiveValues
from .ranged_values_constraint import RangedValues
from .sql_expr import SqlExpr
from .unique_combinations import UniqueCombinations

__all__ = ["chained_relation",
"constraint",
"negative_values",
"literal_range_constraint",
"literal_relation_constraint",
"positive_values",
"ranged_values_constraint",
"unique_combinations"]
63 changes: 63 additions & 0 deletions dbldatagen/constraints/chained_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module defines the ChainedInequality class
"""
import pyspark.sql.functions as F
from .constraint import Constraint


class ChainedRelation(Constraint):
"""ChainedRelation constraint

Constrains one or more columns so that each column has a relationship to the next.

For example if the constraint is defined as `ChainedRelation(['a', 'b','c'], "<")` then only rows that
satisfy the condition `a < b < c` will be included in the output
(where `a`, `b` and `c` represent the data values for the rows).

This can be used to model time related transactions (for example in retail where the purchaseDate, shippingDate
and returnDate all have a specific relationship) etc.

Relations supported include <, <=, >=, >, !=, ==

:param columns: column name or list of column names
:param relation: operator to check - should be one of <,> , =,>=,<=, ==, !=
"""
def __init__(self, columns, relation):
"""

:param columns: List of columns across which to apply the relation
:param relation: relation to test for
"""
Constraint.__init__(self)
self._relation = relation
self._columns = self._columnsFromListOrString(columns)

if relation not in self.SUPPORTED_OPERATORS:
raise ValueError(f"Parameter `relation` should be one of the operators :{self.SUPPORTED_OPERATORS}")

if not isinstance(self._columns, list) or len(self._columns) <= 1:
raise ValueError("ChainedRelation constraints must be defined across more than one column")

def _generate_filter_expression(self):
""" Generated composite filter expression for chained set of filter expressions

I.e if columns is ['a', 'b', 'c'] and relation is '<'

create set of filters [ col('a') < col('b'), col('b') < col('c')]
and combine them as single expression using logical and operation

:return: filter expression for chained expressions
"""
expressions = [F.col(colname) for colname in self._columns]

filters = []
# build set of filters for chained expressions
for ix in range(1, len(expressions)):
filters.append(self._generate_relation_expression(expressions[ix - 1], self._relation, expressions[ix]))

# ... and combine them using logical `and` operation
return self.combineConstraintExpressions(filters)
116 changes: 116 additions & 0 deletions dbldatagen/constraints/constraint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module defines the Constraint class
"""
import types


class Constraint(object):
SUPPORTED_OPERATORS = ["<", ">", ">=", "!=", "==", "=", "<=", "<>"]

""" Constraint object - base class for predefined and custom constraints

This class is meant for internal use only.

"""

def __init__(self):
"""

"""
self._filterExpression = None
self._calculatedFilterExpression = False

def _columnsFromListOrString(self, columns):
""" Get columns as list of columns from string of list-like

:param columns: string or list of strings representing column names
"""
if isinstance(columns, str):
return [columns]
elif isinstance(columns, (list, set, tuple, types.GeneratorType)):
return list(columns)
else:
raise ValueError("Columns must be a string or list of strings")

def _generate_relation_expression(self, column, relation, valueExpression):
""" Generate comparison expression

:param column: Column to generate comparison against
:param relation: relation to implement
:param valueExpression: expression to compare to
:return: relation expression as variation of Pyspark SQL columns
"""
if relation == ">":
return column > valueExpression
elif relation == ">=":
return column >= valueExpression
elif relation == "<":
return column < valueExpression
elif relation == "<=":
return column <= valueExpression
elif relation in ["!=", "<>"]:
return column != valueExpression
elif relation in ["=", "=="]:
return column == valueExpression
else:
raise ValueError(f"Unsupported relation type '{relation}")

@classmethod
def combineConstraintExpressions(cls, constraintExpressions):
""" Combine constraint expressions

:param constraintExpressions: list of constraint expressions
:return: combined constraint expression
"""
assert constraintExpressions is not None and isinstance(constraintExpressions, list), \
"Constraint expressions must be a list of constraint expressions"

if len(constraintExpressions) > 0:
constraint_expression = constraintExpressions[0]

for additional_constraint in constraintExpressions[1:]:
constraint_expression = constraint_expression & additional_constraint

return constraint_expression
else:
raise ValueError("Invalid list of constraint expressions")

def prepareDataGenerator(self, dataGenerator):
""" Prepare the data generator to generate data that matches the constraint

This method may modify the data generation rules to meet the constraint

:param dataGenerator: Data generation object that will generate the dataframe
:return: modified or unmodified data generator
"""
return dataGenerator

def transformDataframe(self, dataGenerator, dataFrame):
""" Transform the dataframe to make data conform to constraint if possible

This method should not modify the dataGenerator - but may modify the dataframe

:param dataGenerator: Data generation object that generated the dataframe
:param dataFrame: generated dataframe
:return: modified or unmodified Spark dataframe

The default transformation returns the dataframe unmodified

"""
return dataFrame

def _generate_filter_expression(self):
""" Generate a Pyspark expression that may be used for filtering"""
return None

@property
def filterExpression(self):
""" Return the filter expression (as instance of type Column that evaluates to True or non-True)"""
if not self._calculatedFilterExpression:
self._filterExpression = self._generate_filter_expression()
self._calculatedFilterExpression = True
return self._filterExpression
45 changes: 45 additions & 0 deletions dbldatagen/constraints/literal_range_constraint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module defines the ScalarRange class
"""
import pyspark.sql.functions as F

from .constraint import Constraint


class LiteralRange(Constraint):
""" LiteralRange Constraint object - validates that column value(s) are between 2 literal values

:param columns: Name of column or list of column names
:param lowValue: Tests that columns have values greater than low value (greater or equal if `strict` is False)
:param highValue: Tests that columns have values less than high value (less or equal if `strict` is False)
:param strict: If True, excludes low and high values from range. Defaults to False

Note `lowValue` and `highValue` must be values that can be converted to a literal expression using the
`pyspark.sql.functions.lit` function
"""

def __init__(self, columns, lowValue, highValue, strict=False):
Constraint.__init__(self)
self._columns = self._columnsFromListOrString(columns)
self._lowValue = lowValue
self._highValue = highValue
self._strict = strict

def _generate_filter_expression(self):
""" Generate a SQL filter expression that may be used for filtering"""
expressions = [F.col(colname) for colname in self._columns]
minValue = F.lit(self._lowValue)
maxValue = F.lit(self._highValue)

# build ranged comparison expressions
if self._strict:
filters = [(column_expr > minValue) & (column_expr < maxValue) for column_expr in expressions]
else:
filters = [column_expr.between(minValue, maxValue) for column_expr in expressions]

# ... and combine them using logical `and` operation
return self.combineConstraintExpressions(filters)
37 changes: 37 additions & 0 deletions dbldatagen/constraints/literal_relation_constraint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module defines the ScalarInequality class
"""
import pyspark.sql.functions as F

from .constraint import Constraint


class LiteralRelation(Constraint):
"""LiteralRelation constraint

Constrains one or more columns so that the columns have an a relationship to a constant value

:param columns: column name or list of column names
:param relation: operator to check - should be one of <,> , =,>=,<=, ==, !=
:param value: A literal value to to compare against
"""

def __init__(self, columns, relation, value):
Constraint.__init__(self)
self._columns = self._columnsFromListOrString(columns)
self._relation = relation
self._value = value

if relation not in self.SUPPORTED_OPERATORS:
raise ValueError(f"Parameter `relation` should be one of the operators :{self.SUPPORTED_OPERATORS}")

def _generate_filter_expression(self):
expressions = [F.col(colname) for colname in self._columns]
literalValue = F.lit(self._value)
filters = [self._generate_relation_expression(col, self._relation, literalValue) for col in expressions]

return self.combineConstraintExpressions(filters)
Loading
Loading