Skip to content

Commit

Permalink
Support Expressions in HAVING and ORDER BY (#525)
Browse files Browse the repository at this point in the history
* Sort and Having allow unnamed expressions

* Recursive replace expressions

* order by and having support expressions

* Fix for Spark CI

* Release 0.2.9

Co-authored-by: Joshua <[email protected]>
Co-authored-by: = <=>
  • Loading branch information
joshua-oss and joshua-oss authored Jan 6, 2023
1 parent 7e9f093 commit 3d624e8
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 23 deletions.
5 changes: 5 additions & 0 deletions sql/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# SmartNoise SQL v0.2.9 Release Notes

* MySql and SQLite readers
* HAVING and ORDER BY allow expresssions in addition to columns

# SmartNoise SQL v0.2.8 Release Notes

* Fix bug where integer sums can overflow i32. All engines default to 64-bit integers now.
Expand Down
2 changes: 1 addition & 1 deletion sql/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.8
0.2.9
2 changes: 1 addition & 1 deletion sql/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "smartnoise-sql"
version = "0.2.8"
version = "0.2.9"
description = "Differentially Private SQL Queries"
authors = ["SmartNoise Team <[email protected]>"]
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion sql/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

setup_kwargs = {
'name': 'smartnoise-sql',
'version': '0.2.8',
'version': '0.2.9',
'description': 'Differentially Private SQL Queries',
'long_description': '[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Python](https://img.shields.io/badge/python-3.7%20%7C%203.8-blue)](https://www.python.org/)\n\n<a href="https://smartnoise.org"><img src="https://github.com/opendp/smartnoise-sdk/raw/main/images/SmartNoise/SVG/Logo%20Mark_grey.svg" align="left" height="65" vspace="8" hspace="18"></a>\n\n## SmartNoise SQL\n\nDifferentially private SQL queries. Tested with:\n* PostgreSQL\n* SQL Server\n* Spark\n* Pandas (SQLite)\n* PrestoDB\n* BigQuery\n\nSmartNoise is intended for scenarios where the analyst is trusted by the data owner. SmartNoise uses the [OpenDP](https://github.com/opendp/opendp) library of differential privacy algorithms.\n\n## Installation\n\n```\npip install smartnoise-sql\n```\n\n## Querying a Pandas DataFrame\n\nUse the `from_df` method to create a private reader that can issue queries against a pandas dataframe.\n\n```python\nimport snsql\nfrom snsql import Privacy\nimport pandas as pd\nprivacy = Privacy(epsilon=1.0, delta=0.01)\n\ncsv_path = \'PUMS.csv\'\nmeta_path = \'PUMS.yaml\'\n\npums = pd.read_csv(csv_path)\nreader = snsql.from_df(pums, privacy=privacy, metadata=meta_path)\n\nresult = reader.execute(\'SELECT sex, AVG(age) AS age FROM PUMS.PUMS GROUP BY sex\')\n```\n\n## Querying a SQL Database\n\nUse `from_connection` to wrap an existing database connection.\n\n```python\nimport snsql\nfrom snsql import Privacy\nimport psycopg2\n\nprivacy = Privacy(epsilon=1.0, delta=0.01)\nmeta_path = \'PUMS.yaml\'\n\npumsdb = psycopg2.connect(user=\'postgres\', host=\'localhost\', database=\'PUMS\')\nreader = snsql.from_connection(pumsdb, privacy=privacy, metadata=meta_path)\n\nresult = reader.execute(\'SELECT sex, AVG(age) AS age FROM PUMS.PUMS GROUP BY sex\')\n```\n\n## Querying a Spark DataFrame\n\nUse `from_connection` to wrap a spark session.\n\n```python\nimport pyspark\nfrom pyspark.sql import SparkSession\nspark = SparkSession.builder.getOrCreate()\nfrom snsql import *\n\npums = spark.read.load(...) # load a Spark DataFrame\npums.createOrReplaceTempView("PUMS_large")\n\nmetadata = \'PUMS_large.yaml\'\n\nprivate_reader = from_connection(\n spark, \n metadata=metadata, \n privacy=Privacy(epsilon=3.0, delta=1/1_000_000)\n)\nprivate_reader.reader.compare.search_path = ["PUMS"]\n\n\nres = private_reader.execute(\'SELECT COUNT(*) FROM PUMS_large\')\nres.show()\n```\n\n## Privacy Cost\n\nThe privacy parameters epsilon and delta are passed in to the private connection at instantiation time, and apply to each computed column during the life of the session. Privacy cost accrues indefinitely as new queries are executed, with the total accumulated privacy cost being available via the `spent` property of the connection\'s `odometer`:\n\n```python\nprivacy = Privacy(epsilon=0.1, delta=10e-7)\n\nreader = from_connection(conn, metadata=metadata, privacy=privacy)\nprint(reader.odometer.spent) # (0.0, 0.0)\n\nresult = reader.execute(\'SELECT COUNT(*) FROM PUMS.PUMS\')\nprint(reader.odometer.spent) # approximately (0.1, 10e-7)\n```\n\nThe privacy cost increases with the number of columns:\n\n```python\nreader = from_connection(conn, metadata=metadata, privacy=privacy)\nprint(reader.odometer.spent) # (0.0, 0.0)\n\nresult = reader.execute(\'SELECT AVG(age), AVG(income) FROM PUMS.PUMS\')\nprint(reader.odometer.spent) # approximately (0.4, 10e-6)\n```\n\nThe odometer is advanced immediately before the differentially private query result is returned to the caller. If the caller wishes to estimate the privacy cost of a query without running it, `get_privacy_cost` can be used:\n\n```python\nreader = from_connection(conn, metadata=metadata, privacy=privacy)\nprint(reader.odometer.spent) # (0.0, 0.0)\n\ncost = reader.get_privacy_cost(\'SELECT AVG(age), AVG(income) FROM PUMS.PUMS\')\nprint(cost) # approximately (0.4, 10e-6)\n\nprint(reader.odometer.spent) # (0.0, 0.0)\n```\n\nNote that the total privacy cost of a session accrues at a slower rate than the sum of the individual query costs obtained by `get_privacy_cost`. The odometer accrues all invocations of mechanisms for the life of a session, and uses them to compute total spend.\n\n```python\nreader = from_connection(conn, metadata=metadata, privacy=privacy)\nquery = \'SELECT COUNT(*) FROM PUMS.PUMS\'\nepsilon_single, _ = reader.get_privacy_cost(query)\nprint(epsilon_single) # 0.1\n\n# no queries executed yet\nprint(reader.odometer.spent) # (0.0, 0.0)\n\nfor _ in range(100):\n reader.execute(query)\n\nepsilon_many, _ = reader.odometer.spent\nprint(f\'{epsilon_many} < {epsilon_single * 100}\')\n```\n\n## Histograms\n\nSQL `group by` queries represent histograms binned by grouping key. Queries over a grouping key with unbounded or non-public dimensions expose privacy risk. For example:\n\n```sql\nSELECT last_name, COUNT(*) FROM Sales GROUP BY last_name\n```\n\nIn the above query, if someone with a distinctive last name is included in the database, that person\'s record might accidentally be revealed, even if the noisy count returns 0 or negative. To prevent this from happening, the system will automatically censor dimensions which would violate differential privacy.\n\n## Private Synopsis\n\nA private synopsis is a pre-computed set of differentially private aggregates that can be filtered and aggregated in various ways to produce new reports. Because the private synopsis is differentially private, reports generated from the synopsis do not need to have additional privacy applied, and the synopsis can be distributed without risk of additional privacy loss. Reports over the synopsis can be generated with non-private SQL, within an Excel Pivot Table, or through other common reporting tools.\n\nYou can see a sample [notebook for creating private synopsis](samples/Synopsis.ipynb) suitable for consumption in Excel or SQL.\n\n## Limitations\n\nYou can think of the data access layer as simple middleware that allows composition of `opendp` computations using the SQL language. The SQL language provides a limited subset of what can be expressed through the full `opendp` library. For example, the SQL language does not provide a way to set per-field privacy budget.\n\nBecause we delegate the computation of exact aggregates to the underlying database engines, execution through the SQL layer can be considerably faster, particularly with database engines optimized for precomputed aggregates. However, this design choice means that analysis graphs composed with SQL language do not access data in the engine on a per-row basis. Therefore, SQL queries do not currently support algorithms that require per-row access, such as quantile algorithms that use underlying values. This is a limitation that future releases will relax for database engines that support row-based access, such as Spark.\n\nThe SQL processing layer has limited support for bounding contributions when individuals can appear more than once in the data. This includes ability to perform reservoir sampling to bound contributions of an individual, and to scale the sensitivity parameter. These parameters are important when querying reporting tables that might be produced from subqueries and joins, but require caution to use safely.\n\nFor this release, we recommend using the SQL functionality while bounding user contribution to 1 row. The platform defaults to this option by setting `max_contrib` to 1, and should only be overridden if you know what you are doing. Future releases will focus on making these options easier for non-experts to use safely.\n\n\n## Communication\n\n- You are encouraged to join us on [GitHub Discussions](https://github.com/opendp/opendp/discussions/categories/smartnoise)\n- Please use [GitHub Issues](https://github.com/opendp/smartnoise-sdk/issues) for bug reports and feature requests.\n- For other requests, including security issues, please contact us at [[email protected]](mailto:[email protected]).\n\n## Releases and Contributing\n\nPlease let us know if you encounter a bug by [creating an issue](https://github.com/opendp/smartnoise-sdk/issues).\n\nWe appreciate all contributions. Please review the [contributors guide](../contributing.rst). We welcome pull requests with bug-fixes without prior discussion.\n\nIf you plan to contribute new features, utility functions or extensions, please first open an issue and discuss the feature with us.\n',
'author': 'SmartNoise Team',
Expand Down
23 changes: 23 additions & 0 deletions sql/snsql/_ast/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,29 @@ def symbol(self, relations):
)
return self

"""
Replace all instances of an expression in the tree with another expression.
:param old: the old expression
:param new: the new expression
:param lock: if True, then the new expression will be locked
such that it cannot be replaced again
:return: the updated expression
"""
def replaced(self, old, new, lock=False):
if hasattr(self, "_locked") and self._locked:
return self
if self == old:
if lock:
new._locked = True
return new
else:
props = self.__dict__
for k, v in props.items():
if isinstance(v, SqlExpr) and str(v) != '*':
props[k] = v.replaced(old, new, lock)
return self

@property
def is_key_count(self):
return False
Expand Down
40 changes: 21 additions & 19 deletions sql/snsql/sql/private_reader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from typing import List, Union
import warnings
import numpy as np
Expand All @@ -10,7 +11,7 @@
from .private_rewriter import Rewriter
from .parse import QueryParser
from .reader import PandasReader
from .reader.base import SortKey
from .reader.base import SortKeyExpressions

from snsql._ast.ast import Query, Top
from snsql._ast.expressions import sql as ast
Expand All @@ -20,6 +21,7 @@
from ._mechanisms import *

import itertools
import string

class PrivateReader(Reader):
"""Executes SQL queries against tabular data sources and returns differentially private results.
Expand Down Expand Up @@ -501,6 +503,7 @@ def _execute_ast(self, query, *ignore, accuracy:bool=False, pre_aggregated=None,
if isinstance(query, str):
raise ValueError("Please pass AST to _execute_ast.")

_orig_query = query
subquery, query = self._rewrite_ast(query)

if pre_aggregated is not None:
Expand Down Expand Up @@ -591,6 +594,8 @@ def process_clamp_counts(row_in):
out_syms = query._select_symbols
out_types = [s.expression.type() for s in out_syms]
out_col_names = [s.name for s in out_syms]
bind_prefix = ''.join(np.random.choice(list(string.ascii_lowercase), 5))
binding_col_names = [name if name != "???" else f"col_{bind_prefix}_{i}" for i, name in enumerate(out_col_names)]

def convert(val, type):
if val is None:
Expand Down Expand Up @@ -641,12 +646,15 @@ def process_out_row(row):
out = map(process_out_row, out)

def filter_aggregate(row, condition):
bindings = dict((name.lower(), val) for name, val in zip(out_col_names, row[0]))
bindings = dict((name.lower(), val) for name, val in zip(binding_col_names, row[0]))
keep = condition.evaluate(bindings)
return keep

if query.having is not None:
condition = query.having.condition
condition = deepcopy(query.having.condition)
for i, ne in enumerate(_orig_query.select.namedExpressions):
source_col = binding_col_names[i]
condition = condition.replaced(ne.expression, ast.Column(source_col), lock=True)
if hasattr(out, "filter"):
# it's an RDD
out = out.filter(lambda row: filter_aggregate(row, condition))
Expand All @@ -655,29 +663,23 @@ def filter_aggregate(row, condition):

# sort it if necessary
if query.order is not None:
sort_fields = []
sort_expressions = []
for si in query.order.sortItems:
if type(si.expression) is not ast.Column:
raise ValueError("We only know how to sort by column names right now")
colname = si.expression.name.lower()
if colname not in out_col_names:
raise ValueError(
"Can't sort by {0}, because it's not in output columns: {1}".format(
colname, out_col_names
)
)
colidx = out_col_names.index(colname)
desc = False
if si.order is not None and si.order.lower() == "desc":
desc = True
if desc and not (out_types[colidx] in ["int", "float", "boolean", "datetime"]):
raise ValueError("We don't know how to sort descending by " + out_types[colidx])
sf = (desc, colidx)
sort_fields.append(sf)
if type(si.expression) is ast.Column and si.expression.name.lower() in out_col_names:
sort_expressions.append((desc, si.expression))
else:
expr = deepcopy(si.expression)
for i, ne in enumerate(_orig_query.select.namedExpressions):
source_col = binding_col_names[i]
expr = expr.replaced(ne.expression, ast.Column(source_col), lock=True)
sort_expressions.append((desc, expr))

def sort_func(row):
# use index 0, since index 1 is accuracy
return SortKey(row[0], sort_fields)
return SortKeyExpressions(row[0], sort_expressions, binding_col_names)

if hasattr(out, "sortBy"):
out = out.sortBy(sort_func)
Expand Down
60 changes: 60 additions & 0 deletions sql/snsql/sql/reader/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ def serialize(self, query):
return str(query)

class SortKey:
"""
Handles comparison operators for sorting
:param obj: The object to be sorted (a row)
:param sort_fields: A list of tuples, where each tuple is a pair of (bool, int)
The bool indicates whether the sort is descending (True) or ascending (False)
The int indicates the column index to sort on
"""
def __init__(self, obj, sort_fields, *args):
self.obj = obj
self.sort_fields = sort_fields
Expand Down Expand Up @@ -181,3 +189,55 @@ def __ge__(self, other):

def __ne__(self, other):
return self.mycmp(self.obj, other.obj, self.sort_fields) != 0

class SortKeyExpressions:
"""
Handles comparison operators for sorting
:param obj: The object to be sorted (a row)
:param sort_expressions: A list of tuples of SqlExpression objects to be used for comparison
each tuple is a boolean indicating whether the sort is descending (True) or ascending (False)
followed by the SqlExpression object to be used for comparison.
:param binding_col_names: A list of column names to be used for binding the sort expression
"""
def __init__(self, obj, sort_expressions, binding_col_names, *args):
self.sort_expressions = sort_expressions
self.bindings = dict((name.lower(), val) for name, val in zip(binding_col_names, obj))
def mycmp(self, bindings_a, bindings_b, sort_expressions):
for desc, expr in sort_expressions:
try:
v_a = expr.evaluate(bindings_a)
v_b = expr.evaluate(bindings_b)
if desc:
if v_a < v_b:
return 1
elif v_a > v_b:
return -1
else:
if v_a < v_b:
return -1
elif v_a > v_b:
return 1
except Exception as e:
message = f"Error evaluating sort expression {expr}"
message += "\nWe can only sort using expressions that can be evaluated on output columns."
raise ValueError(message) from e
return 0

def __lt__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) < 0

def __gt__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) > 0

def __eq__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) == 0

def __le__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) <= 0

def __ge__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) >= 0

def __ne__(self, other):
return self.mycmp(self.bindings, other.bindings, self.sort_expressions) != 0
2 changes: 1 addition & 1 deletion sql/tests/mechanism/test_approx_bounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_bounds_zero_negative(self):
assert (max == 0.0)
def test_bounds_increment(self):
powers = np.arange(10) * 4
vals = [2**p for p in powers] * 100
vals = [2.0**p for p in powers] * 100
min, max = approx_bounds(vals, 10.0)
assert (min == 1.0)
assert (max >= 2**35 and max <= 2**37)
Loading

0 comments on commit 3d624e8

Please sign in to comment.