From 5e15ac64a0c675005e052cea6647aad5b33d73b8 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 12:07:15 +0100 Subject: [PATCH 01/11] Refactor using curry_flip from Expression library --- reactivex/curry.py | 216 ++++++++++++++++++++++++++++++++ reactivex/operators/__init__.py | 18 +-- 2 files changed, 226 insertions(+), 8 deletions(-) create mode 100644 reactivex/curry.py diff --git a/reactivex/curry.py b/reactivex/curry.py new file mode 100644 index 00000000..d20e086b --- /dev/null +++ b/reactivex/curry.py @@ -0,0 +1,216 @@ +from typing import Any, Callable, Literal, Tuple, TypeVar, overload + +from typing_extensions import Concatenate, ParamSpec + +_P = ParamSpec("_P") +_A = TypeVar("_A") +_B = TypeVar("_B") +_C = TypeVar("_C") +_D = TypeVar("_D") +_E = TypeVar("_E") + +_Arity = Literal[0, 1, 2, 3, 4] + + +def _curry( + args: Tuple[Any, ...], arity: int, fun: Callable[..., Any] +) -> Callable[..., Any]: + def wrapper(*args_: Any, **kw: Any) -> Any: + if arity == 1: + return fun(*args, *args_, **kw) + return _curry(args + args_, arity - 1, fun) + + return wrapper + + +@overload +def curry(num_args: Literal[0]) -> Callable[[Callable[_P, _B]], Callable[_P, _B]]: + ... + + +@overload +def curry( + num_args: Literal[1], +) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[[_A], Callable[_P, _B]]]: + ... + + +@overload +def curry( + num_args: Literal[2], +) -> Callable[ + [Callable[Concatenate[_A, _B, _P], _C]], + Callable[ + [_A], + Callable[ + [_B], + Callable[_P, _C], + ], + ], +]: + ... + + +@overload +def curry( + num_args: Literal[3], +) -> Callable[ + [Callable[Concatenate[_A, _B, _C, _P], _D]], + Callable[ + [_A], + Callable[ + [_B], + Callable[ + [_C], + Callable[_P, _D], + ], + ], + ], +]: + ... + + +@overload +def curry( + num_args: Literal[4], +) -> Callable[ + [Callable[Concatenate[_A, _B, _C, _D, _P], _E]], + Callable[ + [_A], + Callable[ + [_B], + Callable[ + [_C], + Callable[[_D], Callable[_P, _E]], + ], + ], + ], +]: + ... + + +def curry(num_args: _Arity) -> Callable[..., Any]: + """A curry decorator. + + Makes a function curried. + + Args: + num_args: The number of args to curry from the start of the + function + + Example: + >>> @curry(1) + ... def add(a: int, b: int) -> int: + ... return a + b + >>> + >>> assert add(3)(4) == 7 + """ + + def wrapper(fun: Callable[..., Any]) -> Callable[..., Any]: + return _curry((), num_args + 1, fun) + + return wrapper + + +@overload +def curry_flip( + num_args: Literal[0], +) -> Callable[[Callable[_P, _A]], Callable[_P, _A]]: + ... + + +@overload +def curry_flip( + num_args: Literal[1], +) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[_P, Callable[[_A], _B]]]: + ... + + +@overload +def curry_flip( + num_args: Literal[2], +) -> Callable[ + [Callable[Concatenate[_A, _B, _P], _C]], + Callable[ + _P, + Callable[ + [_A], + Callable[[_B], _C], + ], + ], +]: + ... + + +@overload +def curry_flip( + num_args: Literal[3], +) -> Callable[ + [Callable[Concatenate[_A, _B, _C, _P], _D]], + Callable[ + _P, + Callable[ + [_A], + Callable[ + [_B], + Callable[[_C], _D], + ], + ], + ], +]: + ... + + +@overload +def curry_flip( + num_args: Literal[4], +) -> Callable[ + [Callable[Concatenate[_A, _B, _C, _D, _P], _E]], + Callable[ + _P, + Callable[ + [_A], + Callable[ + [_B], + Callable[[_C], Callable[[_D], _E]], + ], + ], + ], +]: + ... + + +def curry_flip( + num_args: _Arity, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """A flipped curry decorator. + + Makes a function curried, but flips the curried arguments to become + the last arguments. This is very nice when having e.g optional + arguments after a source argument that will be piped. + + Args: + num_args: The number of args to curry from the start of the + function + + Example: + >>> @curry_flip(1) + ... def map(source: List[int], mapper: Callable[[int], int]): + ... return [mapper(x) for x in source] + >>> + >>> ys = pipe(xs, map(lambda x: x * 10)) + """ + + def _wrap_fun(fun: Callable[..., Any]) -> Callable[..., Any]: + def _wrap_args(*args: Any, **kwargs: Any) -> Callable[..., Any]: + def _wrap_curried(*curry_args: Any) -> Any: + return fun(*curry_args, *args, **kwargs) + + return _curry((), num_args, _wrap_curried) + + return _wrap_args if num_args else fun + + return _wrap_fun + + +__all__ = ["curry", "curry_flip"] diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 43a126fd..24d0e0c6 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -28,6 +28,7 @@ compose, typing, ) +from reactivex.curry import curry_flip from reactivex.internal.basic import identity from reactivex.internal.utils import NotSet from reactivex.subject import Subject @@ -454,9 +455,10 @@ def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T return concat_(*sources) +@curry_flip(1) def concat_map( - project: Mapper[_T1, Observable[_T2]] -) -> Callable[[Observable[_T1]], Observable[_T2]]: + source: Observable[_T1], project: Mapper[_T1, Observable[_T2]] +) -> Observable[_T2]: """Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. @@ -488,7 +490,7 @@ def concat_map( """ - return compose(map(project), merge(max_concurrent=1)) + return source.pipe(map(project), merge(max_concurrent=1)) def contains( @@ -1905,8 +1907,8 @@ def max_by( def merge( - *sources: Observable[Any], max_concurrent: Optional[int] = None -) -> Callable[[Observable[Any]], Observable[Any]]: + *sources: Observable[_T], max_concurrent: Optional[int] = None +) -> Callable[[Observable[Observable[_T]]], Observable[_T]]: """Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences. Or merges two observable @@ -1930,9 +1932,9 @@ def merge( observable sequence. Returns: - An operator function that takes an observable source and - returns the observable sequence that merges the elements of the - inner sequences. + An operator function that takes an observable source and returns + the observable sequence that merges the elements of the inner + sequences. """ from ._merge import merge_ From d780af90a7a83f9f33fc17297b02fc7a9bbd00f7 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 12:12:52 +0100 Subject: [PATCH 02/11] Use Literal from typing_extensions --- reactivex/curry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactivex/curry.py b/reactivex/curry.py index d20e086b..fb7099d7 100644 --- a/reactivex/curry.py +++ b/reactivex/curry.py @@ -1,6 +1,6 @@ -from typing import Any, Callable, Literal, Tuple, TypeVar, overload +from typing import Any, Callable, Tuple, TypeVar, overload -from typing_extensions import Concatenate, ParamSpec +from typing_extensions import Concatenate, ParamSpec, Literal _P = ParamSpec("_P") _A = TypeVar("_A") From a115165bef37a3833ddb0528c072512d46eddca5 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 12:15:08 +0100 Subject: [PATCH 03/11] Fix map operator --- reactivex/operators/_map.py | 68 ++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/reactivex/operators/_map.py b/reactivex/operators/_map.py index 48b4c094..ce41834b 100644 --- a/reactivex/operators/_map.py +++ b/reactivex/operators/_map.py @@ -3,6 +3,7 @@ from reactivex import Observable, abc, compose from reactivex import operators as ops from reactivex import typing +from reactivex.curry import curry_flip from reactivex.internal.basic import identity from reactivex.internal.utils import infinite from reactivex.typing import Mapper, MapperIndexed @@ -11,48 +12,45 @@ _T2 = TypeVar("_T2") +@curry_flip(1) def map_( - mapper: Optional[Mapper[_T1, _T2]] = None -) -> Callable[[Observable[_T1]], Observable[_T2]]: - - _mapper = mapper or cast(Mapper[_T1, _T2], identity) - - def map(source: Observable[_T1]) -> Observable[_T2]: - """Partially applied map operator. + source: Observable[_T1], mapper: Optional[Mapper[_T1, _T2]] = None +) -> Observable[_T2]: + """Partially applied map operator. - Project each element of an observable sequence into a new form - by incorporating the element's index. + Project each element of an observable sequence into a new form + by incorporating the element's index. - Example: - >>> map(source) + Example: + >>> map(source) - Args: - source: The observable source to transform. + Args: + source: The observable source to transform. - Returns: - Returns an observable sequence whose elements are the - result of invoking the transform function on each element - of the source. - """ + Returns: + Returns an observable sequence whose elements are the + result of invoking the transform function on each element + of the source. + """ - def subscribe( - obv: abc.ObserverBase[_T2], scheduler: Optional[abc.SchedulerBase] = None - ) -> abc.DisposableBase: - def on_next(value: _T1) -> None: - try: - result = _mapper(value) - except Exception as err: # pylint: disable=broad-except - obv.on_error(err) - else: - obv.on_next(result) - - return source.subscribe( - on_next, obv.on_error, obv.on_completed, scheduler=scheduler - ) - - return Observable(subscribe) + _mapper = mapper or cast(Mapper[_T1, _T2], identity) - return map + def subscribe( + obv: abc.ObserverBase[_T2], scheduler: Optional[abc.SchedulerBase] = None + ) -> abc.DisposableBase: + def on_next(value: _T1) -> None: + try: + result = _mapper(value) + except Exception as err: # pylint: disable=broad-except + obv.on_error(err) + else: + obv.on_next(result) + + return source.subscribe( + on_next, obv.on_error, obv.on_completed, scheduler=scheduler + ) + + return Observable(subscribe) def map_indexed_( From 901a6fb723bf48be9450e0b8e822e947e549816d Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 12:22:58 +0100 Subject: [PATCH 04/11] Code fixes (isort) --- reactivex/curry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactivex/curry.py b/reactivex/curry.py index fb7099d7..a638f5fe 100644 --- a/reactivex/curry.py +++ b/reactivex/curry.py @@ -1,6 +1,6 @@ from typing import Any, Callable, Tuple, TypeVar, overload -from typing_extensions import Concatenate, ParamSpec, Literal +from typing_extensions import Concatenate, Literal, ParamSpec _P = ParamSpec("_P") _A = TypeVar("_A") From 482146e6d99e0259c627d2d4b4c8fcbb1cd75dad Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 12:38:48 +0100 Subject: [PATCH 05/11] Exclude curry.py for mypy --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 66941d57..10cbc674 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ float_to_top=true python_version = "3.9" follow_imports = "silent" files = ["reactivex"] -exclude = ["reactivex/operators/_\\w.*\\.py$"] # mypy will eventually catch up +exclude = ["reactivex/operators/_\\w.*\\.py$", "reactivex/curry\\.py"] # mypy will eventually catch up disallow_any_generics = true disallow_untyped_defs = true From 0acdea128de6ee00dbe3bfd29f6657e1f96cb741 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 13:13:43 +0100 Subject: [PATCH 06/11] Fix mypy config --- .pre-commit-config.yaml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d24cee0d..f4a21c6a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,7 +30,7 @@ repos: repo: local - hooks: - id: mypy - exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$) + exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$|^reactivex/curry\.py$) repo: https://github.com/pre-commit/mirrors-mypy rev: v0.942 diff --git a/pyproject.toml b/pyproject.toml index 10cbc674..d0d17523 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ float_to_top=true python_version = "3.9" follow_imports = "silent" files = ["reactivex"] -exclude = ["reactivex/operators/_\\w.*\\.py$", "reactivex/curry\\.py"] # mypy will eventually catch up +exclude = ["reactivex/operators/_\\w.*\\.py$", "reactivex/curry\\.py$"] # mypy will eventually catch up disallow_any_generics = true disallow_untyped_defs = true From b50bde2a612b2276559e2e5f4db896c8f3ce6db2 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 31 Dec 2022 13:19:38 +0100 Subject: [PATCH 07/11] Add attribution --- reactivex/curry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/reactivex/curry.py b/reactivex/curry.py index a638f5fe..7ec813bd 100644 --- a/reactivex/curry.py +++ b/reactivex/curry.py @@ -1,3 +1,5 @@ +# This file is borrowed from the Expression library, licensed under the MIT license. +# https://github.com/cognitedata/Expression from typing import Any, Callable, Tuple, TypeVar, overload from typing_extensions import Concatenate, Literal, ParamSpec From bef11f2b299346fa0ec5610701b588ed2a52aa84 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 Jan 2023 07:39:39 +0800 Subject: [PATCH 08/11] Use curry flip on buffer_with_count Propose some test cleanup (more readable) Propose typing overload for map_ Propose defining a UnaryOperator type --- reactivex/operators/_buffer.py | 40 +++++++++---------- reactivex/operators/_map.py | 11 +++++ reactivex/typing.py | 2 + tests/test_observable/test_bufferwithcount.py | 9 +++-- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/reactivex/operators/_buffer.py b/reactivex/operators/_buffer.py index a8920add..cb1259e2 100644 --- a/reactivex/operators/_buffer.py +++ b/reactivex/operators/_buffer.py @@ -2,13 +2,15 @@ from reactivex import Observable, compose from reactivex import operators as ops +from reactivex.curry import curry_flip +from reactivex.typing import UnaryOperator _T = TypeVar("_T") def buffer_( boundaries: Observable[Any], -) -> Callable[[Observable[_T]], Observable[List[_T]]]: +) -> UnaryOperator[_T, List[_T]]: return compose( ops.window(boundaries), ops.flat_map(ops.to_list()), @@ -33,9 +35,10 @@ def buffer_toggle_( ) +@curry_flip(1) def buffer_with_count_( - count: int, skip: Optional[int] = None -) -> Callable[[Observable[_T]], Observable[List[_T]]]: + source: Observable[_T], count: int, skip: Optional[int] = None +) -> Observable[List[_T]]: """Projects each element of an observable sequence into zero or more buffers which are produced based on element count information. @@ -54,27 +57,22 @@ def buffer_with_count_( observable sequence of buffers. """ - def buffer_with_count(source: Observable[_T]) -> Observable[List[_T]]: - nonlocal skip - - if skip is None: - skip = count - - def mapper(value: Observable[_T]) -> Observable[List[_T]]: - return value.pipe( - ops.to_list(), - ) + if skip is None: + skip = count - def predicate(value: List[_T]) -> bool: - return len(value) > 0 - - return source.pipe( - ops.window_with_count(count, skip), - ops.flat_map(mapper), - ops.filter(predicate), + def mapper(value: Observable[_T]) -> Observable[List[_T]]: + return value.pipe( + ops.to_list(), ) - return buffer_with_count + def predicate(value: List[_T]) -> bool: + return len(value) > 0 + + return source.pipe( + ops.window_with_count(count, skip), + ops.flat_map(mapper), + ops.filter(predicate), + ) __all__ = ["buffer_", "buffer_with_count_", "buffer_when_", "buffer_toggle_"] diff --git a/reactivex/operators/_map.py b/reactivex/operators/_map.py index ce41834b..75286c0b 100644 --- a/reactivex/operators/_map.py +++ b/reactivex/operators/_map.py @@ -1,4 +1,5 @@ from typing import Callable, Optional, TypeVar, cast +from typing_extensions import overload from reactivex import Observable, abc, compose from reactivex import operators as ops @@ -12,6 +13,16 @@ _T2 = TypeVar("_T2") +@overload +def map_(source: Observable[_T1]) -> Observable[_T1]: + ... + + +@overload +def map_(source: Observable[_T1], mapper: Mapper[_T1, _T2]) -> Observable[_T2]: + ... + + @curry_flip(1) def map_( source: Observable[_T1], mapper: Optional[Mapper[_T1, _T2]] = None diff --git a/reactivex/typing.py b/reactivex/typing.py index baf10c25..b85c0349 100644 --- a/reactivex/typing.py +++ b/reactivex/typing.py @@ -2,6 +2,7 @@ from typing import Callable, TypeVar, Union from .abc.observable import Subscription +from .observable import Observable from .abc.observer import OnCompleted, OnError, OnNext from .abc.periodicscheduler import ( ScheduledPeriodicAction, @@ -28,6 +29,7 @@ Comparer = Callable[[_T1, _T1], bool] SubComparer = Callable[[_T1, _T1], int] Accumulator = Callable[[_TState, _T1], _TState] +UnaryOperator = Callable[[Observable[_T1]], Observable[_T2]] Startable = Union[StartableBase, Thread] diff --git a/tests/test_observable/test_bufferwithcount.py b/tests/test_observable/test_bufferwithcount.py index 6bc10cfe..ac34a1a3 100644 --- a/tests/test_observable/test_bufferwithcount.py +++ b/tests/test_observable/test_bufferwithcount.py @@ -2,6 +2,7 @@ from reactivex import operators as ops from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.subscription import Subscription on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -210,7 +211,7 @@ def create(): return xs.pipe(ops.buffer_with_count(2, 3)) results = scheduler.start(create).messages - self.assertEqual(3, len(results)) - assert sequence_equal(results[0].value.value, [2, 3]) and results[0].time == 220 - assert sequence_equal(results[1].value.value, [5]) and results[1].time == 250 - assert results[2].value.kind == "C" and results[2].time == 250 + self.assertEqual( + results, [on_next(220, [2, 3]), on_next(250, [5]), on_completed(250)] + ) + self.assertEqual(xs.subscriptions, [Subscription(200, 250)]) From b349f25cc5fe0d53b81f6711956a9c194a78f362 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 Jan 2023 07:47:48 +0800 Subject: [PATCH 09/11] Fix overloads on map Propose MonoTypeOperator type Propose documentation draft for curry flip --- docs/get_started.rst | 21 +++++++++++++++++++++ reactivex/operators/_map.py | 6 +++++- reactivex/typing.py | 1 + 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/docs/get_started.rst b/docs/get_started.rst index e838cd58..275b91f9 100644 --- a/docs/get_started.rst +++ b/docs/get_started.rst @@ -221,6 +221,27 @@ Output: Received delta Received epsilon +Version x.x.x introduces the ``curry_flip`` decorator to make the creation of custom +operators less verbose: + +.. code:: python + + import reactivex + + @curry_flip(1) + def lowercase(source): + def subscribe(observer, scheduler = None): + def on_next(value): + observer.on_next(value.lower()) + + return source.subscribe( + on_next, + observer.on_error, + observer.on_completed, + scheduler=scheduler) + return reactivex.create(subscribe) + + Concurrency ----------- diff --git a/reactivex/operators/_map.py b/reactivex/operators/_map.py index 75286c0b..90ce18b4 100644 --- a/reactivex/operators/_map.py +++ b/reactivex/operators/_map.py @@ -13,13 +13,17 @@ _T2 = TypeVar("_T2") +@curry_flip(1) @overload def map_(source: Observable[_T1]) -> Observable[_T1]: ... +@curry_flip(1) @overload -def map_(source: Observable[_T1], mapper: Mapper[_T1, _T2]) -> Observable[_T2]: +def map_( + source: Observable[_T1], mapper: Optional[Mapper[_T1, _T2]] +) -> Observable[_T2]: ... diff --git a/reactivex/typing.py b/reactivex/typing.py index b85c0349..864bee66 100644 --- a/reactivex/typing.py +++ b/reactivex/typing.py @@ -30,6 +30,7 @@ SubComparer = Callable[[_T1, _T1], int] Accumulator = Callable[[_TState, _T1], _TState] UnaryOperator = Callable[[Observable[_T1]], Observable[_T2]] +MonoTypeOperator = UnaryOperator[_T1, _T1] Startable = Union[StartableBase, Thread] From b6696bb6a6544d6834df8c3fc797e3276caab059 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 Jan 2023 07:55:08 +0800 Subject: [PATCH 10/11] iSort fix --- docs/get_started.rst | 3 +++ reactivex/operators/_map.py | 1 + reactivex/typing.py | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/get_started.rst b/docs/get_started.rst index 275b91f9..46289721 100644 --- a/docs/get_started.rst +++ b/docs/get_started.rst @@ -241,6 +241,9 @@ operators less verbose: scheduler=scheduler) return reactivex.create(subscribe) +When building more complex operators which take arguments or even optional arguments, +``curry_flip`` allows to always keep the ``source`` as first argument in the definition. + Concurrency ----------- diff --git a/reactivex/operators/_map.py b/reactivex/operators/_map.py index 90ce18b4..92a2bfe5 100644 --- a/reactivex/operators/_map.py +++ b/reactivex/operators/_map.py @@ -1,4 +1,5 @@ from typing import Callable, Optional, TypeVar, cast + from typing_extensions import overload from reactivex import Observable, abc, compose diff --git a/reactivex/typing.py b/reactivex/typing.py index 864bee66..eeb9b260 100644 --- a/reactivex/typing.py +++ b/reactivex/typing.py @@ -2,7 +2,6 @@ from typing import Callable, TypeVar, Union from .abc.observable import Subscription -from .observable import Observable from .abc.observer import OnCompleted, OnError, OnNext from .abc.periodicscheduler import ( ScheduledPeriodicAction, @@ -15,6 +14,7 @@ ScheduledAction, ) from .abc.startable import StartableBase +from .observable import Observable _TState = TypeVar("_TState") _T1 = TypeVar("_T1") From 5873b211348a96b81e0820b5686e205b4fe5586b Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 1 Jan 2023 08:13:34 +0800 Subject: [PATCH 11/11] Drop typing idea Hint on contributing, checks to run --- docs/contributing.rst | 7 +++++++ reactivex/operators/_buffer.py | 3 +-- reactivex/typing.py | 3 --- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/contributing.rst b/docs/contributing.rst index 1e929cc2..5d996014 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -11,3 +11,10 @@ Please register any issues to `ReactiveX/RxPY/issues `_ branch. + +Before submitting the pull requets, be sure to run + +.. code:: shell + + poetry run pre-commit run --all-files --show-diff-on-failure + poetry run coverage run -m pytest && poetry run coverage report -m diff --git a/reactivex/operators/_buffer.py b/reactivex/operators/_buffer.py index cb1259e2..900da105 100644 --- a/reactivex/operators/_buffer.py +++ b/reactivex/operators/_buffer.py @@ -3,14 +3,13 @@ from reactivex import Observable, compose from reactivex import operators as ops from reactivex.curry import curry_flip -from reactivex.typing import UnaryOperator _T = TypeVar("_T") def buffer_( boundaries: Observable[Any], -) -> UnaryOperator[_T, List[_T]]: +) -> Callable[[Observable[_T]], Observable[List[_T]]]: return compose( ops.window(boundaries), ops.flat_map(ops.to_list()), diff --git a/reactivex/typing.py b/reactivex/typing.py index eeb9b260..baf10c25 100644 --- a/reactivex/typing.py +++ b/reactivex/typing.py @@ -14,7 +14,6 @@ ScheduledAction, ) from .abc.startable import StartableBase -from .observable import Observable _TState = TypeVar("_TState") _T1 = TypeVar("_T1") @@ -29,8 +28,6 @@ Comparer = Callable[[_T1, _T1], bool] SubComparer = Callable[[_T1, _T1], int] Accumulator = Callable[[_TState, _T1], _TState] -UnaryOperator = Callable[[Observable[_T1]], Observable[_T2]] -MonoTypeOperator = UnaryOperator[_T1, _T1] Startable = Union[StartableBase, Thread]