Skip to content

Commit

Permalink
Some remaining simple operations (#54)
Browse files Browse the repository at this point in the history
* Implement is distinct

* Enable joining with literal conditions

* Add missing string operations

* Stylefix

* Implement and test similar to operation

* Implement some of the date extract functions

* Update documentation

* Added current datetime functions

* Currently there is a problem in the type information of the current_time and localtime. We use localized time in all the cases so far

* Stylefix
  • Loading branch information
nils-braun authored Nov 5, 2020
1 parent 05b08ae commit 09093ce
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 40 deletions.
1 change: 1 addition & 0 deletions conda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ maven>=3.6.0
pytest>=6.0.1
pytest-cov>=2.10.1
sphinx>=3.2.1
tzlocal>=2.1
fastapi>=0.61.1
uvicorn>=0.11.3
pyarrow>=0.15.1
142 changes: 106 additions & 36 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import operator
from functools import reduce
from typing import Any, Union, Callable
Expand All @@ -9,13 +10,15 @@
import dask.dataframe as dd
import dask.array as da
import pandas as pd
from tzlocal import get_localzone

from dask_sql.physical.rex import RexConverter
from dask_sql.physical.rex.base import BaseRexPlugin
from dask_sql.utils import LoggableDataFrame, is_frame
from dask_sql.datacontainer import DataContainer

logger = logging.getLogger(__name__)
SeriesOrScalar = Union[dd.Series, Any]


class Operation:
Expand All @@ -25,7 +28,7 @@ def __init__(self, f: Callable):
"""Init with the given function"""
self.f = f

def __call__(self, *operands) -> Union[dd.Series, Any]:
def __call__(self, *operands) -> SeriesOrScalar:
"""Call the stored function"""
return self.f(*operands)

Expand Down Expand Up @@ -94,11 +97,8 @@ def __init__(self):
super().__init__(self.case)

def case(
self,
where: Union[dd.Series, Any],
then: Union[dd.Series, Any],
other: Union[dd.Series, Any],
) -> Union[dd.Series, Any]:
self, where: SeriesOrScalar, then: SeriesOrScalar, other: SeriesOrScalar,
) -> SeriesOrScalar:
"""
Returns `then` where `where`, else `other`.
"""
Expand All @@ -124,7 +124,7 @@ class IsFalseOperation(Operation):
def __init__(self):
super().__init__(self.false_)

def false_(self, df: Union[dd.Series, Any],) -> Union[dd.Series, Any]:
def false_(self, df: SeriesOrScalar,) -> SeriesOrScalar:
"""
Returns true where `df` is false (where `df` can also be just a scalar).
Returns false on nan.
Expand All @@ -141,7 +141,7 @@ class IsTrueOperation(Operation):
def __init__(self):
super().__init__(self.true_)

def true_(self, df: Union[dd.Series, Any],) -> Union[dd.Series, Any]:
def true_(self, df: SeriesOrScalar,) -> SeriesOrScalar:
"""
Returns true where `df` is true (where `df` can also be just a scalar).
Returns false on nan.
Expand All @@ -158,7 +158,7 @@ class NotOperation(Operation):
def __init__(self):
super().__init__(self.not_)

def not_(self, df: Union[dd.Series, Any],) -> Union[dd.Series, Any]:
def not_(self, df: SeriesOrScalar,) -> SeriesOrScalar:
"""
Returns not `df` (where `df` can also be just a scalar).
"""
Expand All @@ -174,7 +174,7 @@ class IsNullOperation(Operation):
def __init__(self):
super().__init__(self.null)

def null(self, df: Union[dd.Series, Any],) -> Union[dd.Series, Any]:
def null(self, df: SeriesOrScalar,) -> SeriesOrScalar:
"""
Returns true where `df` is null (where `df` can also be just a scalar).
"""
Expand All @@ -184,15 +184,15 @@ def null(self, df: Union[dd.Series, Any],) -> Union[dd.Series, Any]:
return pd.isna(df) or df is None or np.isnan(df)


class LikeOperation(Operation):
"""The like operator (regex for SQL with some twist)"""
class RegexOperation(Operation):
"""An abstract regex operation, which transforms the SQL regex into something python can understand"""

def __init__(self):
super().__init__(self.like)
super().__init__(self.regex)

def like(
self, test: Union[dd.Series, Any], regex: str, escape: str = None,
) -> Union[dd.Series, Any]:
def regex(
self, test: SeriesOrScalar, regex: str, escape: str = None,
) -> SeriesOrScalar:
"""
Returns true, if the string test matches the given regex
(maybe escaped by escape)
Expand All @@ -219,30 +219,15 @@ def like(
if char == "]":
in_char_range = False

elif char == "[":
in_char_range = True

# These chars have a special meaning in regex
# whereas in SQL they have not, so we need to
# add additional escaping
elif char in [
"#",
"$",
"^",
".",
"|",
"~",
"-",
"+",
"*",
"?",
"(",
")",
"{",
"}",
]:
elif char in self.replacement_chars:
char = "\\" + char

elif char == "[":
in_char_range = True

# The needed "\" is printed above, so we continue
elif char == escape:
escaped = True
Expand All @@ -268,6 +253,38 @@ def like(
return bool(re.match(transformed_regex, test))


class LikeOperation(RegexOperation):
replacement_chars = [
"#",
"$",
"^",
".",
"|",
"~",
"-",
"+",
"*",
"?",
"(",
")",
"{",
"}",
"[",
"]",
]


class SimilarOperation(RegexOperation):
replacement_chars = [
"#",
"$",
"^",
".",
"~",
"-",
]


class PositionOperation(Operation):
"""The position operator (get the position of a string)"""

Expand Down Expand Up @@ -354,6 +371,51 @@ def overlay(self, s, replace, start, length=None):
return s


class ExtractOperation(Operation):
def __init__(self):
super().__init__(self.extract)

def extract(self, what, df: SeriesOrScalar):
input_df = df
if is_frame(df):
df = df.dt
else:
df = pd.to_datetime(df)

if what == "CENTURY":
return da.trunc(df.year / 100)
elif what == "DAY":
return df.day
elif what == "DECADE":
return da.trunc(df.year / 10)
elif what == "DOW":
return (df.dayofweek + 1) % 7
elif what == "DOY":
return df.dayofyear
elif what == "HOUR":
return df.hour
elif what == "MICROSECOND":
return df.microsecond
elif what == "MILLENNIUM":
return da.trunc(df.year / 1000)
elif what == "MILLISECOND":
return da.trunc(1000 * df.microsecond)
elif what == "MINUTE":
return df.minute
elif what == "MONTH":
return df.month
elif what == "QUARTER":
return df.quarter
elif what == "SECOND":
return df.second
elif what == "WEEK":
return df.week
elif what == "YEAR":
return df.year
else: # pragma: no cover
raise NotImplementedError(f"Extraction of {what} is not (yet) implemented.")


class RexCallPlugin(BaseRexPlugin):
"""
RexCall is used for expressions, which calculate something.
Expand Down Expand Up @@ -389,6 +451,7 @@ class RexCallPlugin(BaseRexPlugin):
# special operations
"case": CaseOperation(),
"like": LikeOperation(),
"similar to": SimilarOperation(),
"not": NotOperation(),
"is null": IsNullOperation(),
"is not null": NotOperation().of(IsNullOperation()),
Expand Down Expand Up @@ -431,14 +494,21 @@ class RexCallPlugin(BaseRexPlugin):
"overlay": OverlayOperation(),
"substring": SubStringOperation(),
"initcap": TensorScalarOperation(lambda x: x.str.title(), lambda x: x.title()),
# date/time operations
"extract": ExtractOperation(),
"localtime": Operation(lambda *args: pd.Timestamp.now()),
"localtimestamp": Operation(lambda *args: pd.Timestamp.now()),
"current_time": Operation(lambda *args: pd.Timestamp.now()),
"current_date": Operation(lambda *args: pd.Timestamp.now()),
"current_timestamp": Operation(lambda *args: pd.Timestamp.now()),
}

def convert(
self,
rex: "org.apache.calcite.rex.RexNode",
dc: DataContainer,
context: "dask_sql.Context",
) -> Union[dd.Series, Any]:
) -> SeriesOrScalar:
# Prepare the operands by turning the RexNodes into python expressions
operands = [
RexConverter.convert(o, dc, context=context) for o in rex.getOperands()
Expand Down
8 changes: 7 additions & 1 deletion dask_sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import defaultdict
from dask_sql.datacontainer import DataContainer
import re
from datetime import datetime
import logging

import dask.dataframe as dd
Expand Down Expand Up @@ -36,7 +37,12 @@ def is_frame(df):
"""
Check if something is a dataframe (and not a scalar or none)
"""
return df is not None and not np.isscalar(df) and not isinstance(df, type(pd.NA))
return (
df is not None
and not np.isscalar(df)
and not isinstance(df, type(pd.NA))
and not isinstance(df, datetime)
)


class Pluggable:
Expand Down
10 changes: 8 additions & 2 deletions docs/pages/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,15 @@ Binary Operations: ``AND``, ``OR``, ``>``, ``>=``, ``<``, ``<=``, ``=``, ``<>``,

Unary Math Operations: ``ABS``, ``ACOS``, ``ASIN``, ``ATAN``, ``ATAN2``, ``CBRT``, ``CEIL``, ``COS``, ``COT``, ``DEGREES``, ``EXP``, ``FLOOR``, ``LOG10``, ``LN``, ``POWER``, ``RADIANS``, ``ROUND``, ``SIGN``, ``SIN``, ``TAN``, ``TRUNCATE``

String operations: ``||``, ``CHAR_LENGTH``, ``UPPER``, ``LOWER``, ``POSITION``, ``TRIM``, ``OVERLAY``, ``SUBSTRING``, ``INITCAP``
String operations: ``LIKE``, ``SIMILAR TO``, ``||``, ``CHAR_LENGTH``, ``UPPER``, ``LOWER``, ``POSITION``, ``TRIM``, ``OVERLAY``, ``SUBSTRING``, ``INITCAP``

Special Operations: ``CASE``, ``LIKE``, ``NOT``, ``IS NULL``, ``IS NOT NULL``, ``IS TRUE``, ``IS NOT TRUE``, ``IS FALSE:``, ``IS NOT FALSE``, ``IS UNKNOWN``, ``IS NOT UNKNOWN``, ``EXISTS``
Date operations: ``EXTRACT``, ``YEAR``, ``QUARTER``, ``MONTH``, ``WEEK``, ``DAYOFYEAR``, ``DAYOFMONTH``, ``DAYOFWEEK``, ``HOUR``, ``MINUTE``, ``SECOND``, ``LOCALTIME``, ``LOCALTIMESTAMP``, ``CURRENT_TIME``, ``CURRENT_DATE``, ``CURRENT_TIMESTAMP``

.. note::

Due to a `bug/inconsistency <https://issues.apache.org/jira/browse/CALCITE-4313>`_ in Apache Calcite, both the ``CURRENTTIME`` and ``LOCALTIME`` return a time without timezone and are therefore the same functionality.

Special Operations: ``CASE``, ``NOT``, ``IS NULL``, ``IS NOT NULL``, ``IS TRUE``, ``IS NOT TRUE``, ``IS FALSE:``, ``IS NOT FALSE``, ``IS UNKNOWN``, ``IS NOT UNKNOWN``, ``EXISTS``

Aggregations
~~~~~~~~~~~~
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ def test_string_operations(assert_query_gives_same_result):
SELECT
s,
s || 'hello' || s,
s SIMILAR TO '%(b|d)%',
s SIMILAR TO '%(B|c)%',
s SIMILAR TO '%[a-zA-Z]%',
s SIMILAR TO '.*',
s LIKE '%(b|d)%',
s LIKE '%(B|c)%',
s LIKE '%[a-zA-Z]%',
s LIKE '.*',
CHAR_LENGTH(s),
UPPER(s),
LOWER(s),
Expand Down
Loading

0 comments on commit 09093ce

Please sign in to comment.