Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor using curry_flip from Expression library #682

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ repos:
repo: local
- hooks:
- id: mypy
exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$)
exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$|^reactivex/curry\.py$)
repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942

7 changes: 7 additions & 0 deletions docs/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ Please register any issues to `ReactiveX/RxPY/issues <https://github.com/Reactiv

Please submit any pull requests against the
`master <https://github.com/ReactiveX/RxPY/tree/master>`_ branch.

Before submitting the pull requets, be sure to run

.. code:: shell

poetry run pre-commit run --all-files --show-diff-on-failure
poetry run coverage run -m pytest && poetry run coverage report -m
24 changes: 24 additions & 0 deletions docs/get_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,30 @@ Output:
Received delta
Received epsilon

Version x.x.x introduces the ``curry_flip`` decorator to make the creation of custom
operators less verbose:

.. code:: python

import reactivex

@curry_flip(1)
def lowercase(source):
def subscribe(observer, scheduler = None):
def on_next(value):
observer.on_next(value.lower())

return source.subscribe(
on_next,
observer.on_error,
observer.on_completed,
scheduler=scheduler)
return reactivex.create(subscribe)

When building more complex operators which take arguments or even optional arguments,
``curry_flip`` allows to always keep the ``source`` as first argument in the definition.


Concurrency
-----------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ float_to_top=true
python_version = "3.9"
follow_imports = "silent"
files = ["reactivex"]
exclude = ["reactivex/operators/_\\w.*\\.py$"] # mypy will eventually catch up
exclude = ["reactivex/operators/_\\w.*\\.py$", "reactivex/curry\\.py$"] # mypy will eventually catch up
disallow_any_generics = true
disallow_untyped_defs = true

Expand Down
218 changes: 218 additions & 0 deletions reactivex/curry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# This file is borrowed from the Expression library, licensed under the MIT license.
# https://github.com/cognitedata/Expression
from typing import Any, Callable, Tuple, TypeVar, overload

from typing_extensions import Concatenate, Literal, ParamSpec

_P = ParamSpec("_P")
_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")
_D = TypeVar("_D")
_E = TypeVar("_E")

_Arity = Literal[0, 1, 2, 3, 4]


def _curry(
args: Tuple[Any, ...], arity: int, fun: Callable[..., Any]
) -> Callable[..., Any]:
def wrapper(*args_: Any, **kw: Any) -> Any:
if arity == 1:
return fun(*args, *args_, **kw)
return _curry(args + args_, arity - 1, fun)

return wrapper


@overload
def curry(num_args: Literal[0]) -> Callable[[Callable[_P, _B]], Callable[_P, _B]]:
...


@overload
def curry(
num_args: Literal[1],
) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[[_A], Callable[_P, _B]]]:
...


@overload
def curry(
num_args: Literal[2],
) -> Callable[
[Callable[Concatenate[_A, _B, _P], _C]],
Callable[
[_A],
Callable[
[_B],
Callable[_P, _C],
],
],
]:
...


@overload
def curry(
num_args: Literal[3],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _P], _D]],
Callable[
[_A],
Callable[
[_B],
Callable[
[_C],
Callable[_P, _D],
],
],
],
]:
...


@overload
def curry(
num_args: Literal[4],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _D, _P], _E]],
Callable[
[_A],
Callable[
[_B],
Callable[
[_C],
Callable[[_D], Callable[_P, _E]],
],
],
],
]:
...


def curry(num_args: _Arity) -> Callable[..., Any]:
"""A curry decorator.

Makes a function curried.

Args:
num_args: The number of args to curry from the start of the
function

Example:
>>> @curry(1)
... def add(a: int, b: int) -> int:
... return a + b
>>>
>>> assert add(3)(4) == 7
"""

def wrapper(fun: Callable[..., Any]) -> Callable[..., Any]:
return _curry((), num_args + 1, fun)

return wrapper


@overload
def curry_flip(
num_args: Literal[0],
) -> Callable[[Callable[_P, _A]], Callable[_P, _A]]:
...


@overload
def curry_flip(
num_args: Literal[1],
) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[_P, Callable[[_A], _B]]]:
...


@overload
def curry_flip(
num_args: Literal[2],
) -> Callable[
[Callable[Concatenate[_A, _B, _P], _C]],
Callable[
_P,
Callable[
[_A],
Callable[[_B], _C],
],
],
]:
...


@overload
def curry_flip(
num_args: Literal[3],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _P], _D]],
Callable[
_P,
Callable[
[_A],
Callable[
[_B],
Callable[[_C], _D],
],
],
],
]:
...


@overload
def curry_flip(
num_args: Literal[4],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _D, _P], _E]],
Callable[
_P,
Callable[
[_A],
Callable[
[_B],
Callable[[_C], Callable[[_D], _E]],
],
],
],
]:
...


def curry_flip(
num_args: _Arity,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""A flipped curry decorator.

Makes a function curried, but flips the curried arguments to become
the last arguments. This is very nice when having e.g optional
arguments after a source argument that will be piped.

Args:
num_args: The number of args to curry from the start of the
function

Example:
>>> @curry_flip(1)
... def map(source: List[int], mapper: Callable[[int], int]):
... return [mapper(x) for x in source]
>>>
>>> ys = pipe(xs, map(lambda x: x * 10))
"""

def _wrap_fun(fun: Callable[..., Any]) -> Callable[..., Any]:
def _wrap_args(*args: Any, **kwargs: Any) -> Callable[..., Any]:
def _wrap_curried(*curry_args: Any) -> Any:
return fun(*curry_args, *args, **kwargs)

return _curry((), num_args, _wrap_curried)

return _wrap_args if num_args else fun

return _wrap_fun


__all__ = ["curry", "curry_flip"]
18 changes: 10 additions & 8 deletions reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
compose,
typing,
)
from reactivex.curry import curry_flip
from reactivex.internal.basic import identity
from reactivex.internal.utils import NotSet
from reactivex.subject import Subject
Expand Down Expand Up @@ -454,9 +455,10 @@ def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T
return concat_(*sources)


@curry_flip(1)
def concat_map(
project: Mapper[_T1, Observable[_T2]]
) -> Callable[[Observable[_T1]], Observable[_T2]]:
source: Observable[_T1], project: Mapper[_T1, Observable[_T2]]
) -> Observable[_T2]:
"""Projects each source value to an Observable which is merged in the
output Observable, in a serialized fashion waiting for each one to complete
before merging the next.
Expand Down Expand Up @@ -488,7 +490,7 @@ def concat_map(

"""

return compose(map(project), merge(max_concurrent=1))
return source.pipe(map(project), merge(max_concurrent=1))


def contains(
Expand Down Expand Up @@ -1905,8 +1907,8 @@ def max_by(


def merge(
*sources: Observable[Any], max_concurrent: Optional[int] = None
) -> Callable[[Observable[Any]], Observable[Any]]:
*sources: Observable[_T], max_concurrent: Optional[int] = None
) -> Callable[[Observable[Observable[_T]]], Observable[_T]]:
"""Merges an observable sequence of observable sequences into an
observable sequence, limiting the number of concurrent
subscriptions to inner sequences. Or merges two observable
Expand All @@ -1930,9 +1932,9 @@ def merge(
observable sequence.

Returns:
An operator function that takes an observable source and
returns the observable sequence that merges the elements of the
inner sequences.
An operator function that takes an observable source and returns
the observable sequence that merges the elements of the inner
sequences.
"""
from ._merge import merge_

Expand Down
37 changes: 17 additions & 20 deletions reactivex/operators/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from reactivex import Observable, compose
from reactivex import operators as ops
from reactivex.curry import curry_flip

_T = TypeVar("_T")

Expand Down Expand Up @@ -33,9 +34,10 @@ def buffer_toggle_(
)


@curry_flip(1)
def buffer_with_count_(
count: int, skip: Optional[int] = None
) -> Callable[[Observable[_T]], Observable[List[_T]]]:
source: Observable[_T], count: int, skip: Optional[int] = None
) -> Observable[List[_T]]:
"""Projects each element of an observable sequence into zero or more
buffers which are produced based on element count information.

Expand All @@ -54,27 +56,22 @@ def buffer_with_count_(
observable sequence of buffers.
"""

def buffer_with_count(source: Observable[_T]) -> Observable[List[_T]]:
nonlocal skip

if skip is None:
skip = count

def mapper(value: Observable[_T]) -> Observable[List[_T]]:
return value.pipe(
ops.to_list(),
)
if skip is None:
skip = count

def predicate(value: List[_T]) -> bool:
return len(value) > 0

return source.pipe(
ops.window_with_count(count, skip),
ops.flat_map(mapper),
ops.filter(predicate),
def mapper(value: Observable[_T]) -> Observable[List[_T]]:
return value.pipe(
ops.to_list(),
)

return buffer_with_count
def predicate(value: List[_T]) -> bool:
return len(value) > 0

return source.pipe(
ops.window_with_count(count, skip),
ops.flat_map(mapper),
ops.filter(predicate),
)


__all__ = ["buffer_", "buffer_with_count_", "buffer_when_", "buffer_toggle_"]
Loading