Skip to content

Commit

Permalink
Type fixes (#629)
Browse files Browse the repository at this point in the history
- Add dependency on typing extensions to we can use param specifiers for wrapper functions
- Fix for typing of aliased functions
- Fix for typing of synchronized functions
- Fix type error in flat-map
  • Loading branch information
dbrattli committed Mar 11, 2022
1 parent 0e5a07b commit 9bb8393
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ repos:
language: node
pass_filenames: false
types: [python]
additional_dependencies: ["pyright@1.1.227"]
additional_dependencies: ["pyright@1.1.228"]
repo: local
- hooks:
- id: mypy
Expand Down
6 changes: 4 additions & 2 deletions examples/timeflies/timeflies_tkinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ def label2stream(
def char2label(char: str) -> Label:
return Label(frame, text=char)

reactivex.from_(text).pipe(
xs = reactivex.of(text).pipe(
ops.map(char2label),
ops.flat_map_indexed(label2stream),
).subscribe(on_next, on_error=print, scheduler=scheduler)
)

xs.subscribe(on_next, on_error=print, scheduler=scheduler)

frame.pack()
root.mainloop()
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ packages = [

[tool.poetry.dependencies]
python = ">= 3.7, < 3.11"
typing-extensions = "^4.1.1"

[tool.poetry.dev-dependencies]
pytest-asyncio = "^0.18.1"
Expand Down
4 changes: 2 additions & 2 deletions reactivex/abc/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
OnCompleted = Callable[[], None]


class ObserverBase(Generic[_T], ABC):
class ObserverBase(Generic[_T_in], ABC):
"""Observer abstract base class
An Observer is the entity that receives all emissions of a
Expand All @@ -19,7 +19,7 @@ class ObserverBase(Generic[_T], ABC):
__slots__ = ()

@abstractmethod
def on_next(self, value: _T) -> None:
def on_next(self, value: _T_in) -> None:
"""Notifies the observer of a new element in the sequence.
Args:
Expand Down
3 changes: 2 additions & 1 deletion reactivex/abc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
RelativeTime = Union[timedelta, float]
AbsoluteOrRelativeTime = Union[datetime, timedelta, float]
ScheduledAction = Callable[
["SchedulerBase", Optional[_TState]], Optional[DisposableBase]
["SchedulerBase", Optional[_TState]],
Optional[DisposableBase],
]


Expand Down
13 changes: 9 additions & 4 deletions reactivex/internal/concurrency.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from threading import RLock, Thread
from typing import Any, Callable
from typing import Any, Callable, TypeVar

from typing_extensions import ParamSpec

from reactivex.typing import StartableTarget

_T = TypeVar("_T")
_P = ParamSpec("_P")


def default_thread_factory(target: StartableTarget) -> Thread:
return Thread(target=target, daemon=True)


def synchronized(lock: RLock) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def synchronized(lock: RLock) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
"""A decorator for synchronizing access to a given function."""

def wrapper(fn: Callable[..., Any]) -> Callable[..., Any]:
def inner(*args: Any, **kw: Any) -> Any:
def wrapper(fn: Callable[_P, _T]) -> Callable[_P, _T]:
def inner(*args: _P.args, **kw: _P.kwargs) -> Any:
with lock:
return fn(*args, **kw)

Expand Down
5 changes: 4 additions & 1 deletion reactivex/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from types import FunctionType
from typing import TYPE_CHECKING, Any, Callable, Iterable, Optional, TypeVar, cast

from typing_extensions import ParamSpec

from reactivex import abc
from reactivex.disposable import CompositeDisposable
from reactivex.disposable.refcountdisposable import RefCountDisposable
Expand All @@ -10,6 +12,7 @@
from reactivex import Observable

_T = TypeVar("_T")
_P = ParamSpec("_P")


def add_ref(xs: "Observable[_T]", r: RefCountDisposable) -> "Observable[_T]":
Expand All @@ -30,7 +33,7 @@ def infinite() -> Iterable[int]:
n += 1


def alias(name: str, doc: str, fun: Callable[..., Any]) -> Callable[..., Any]:
def alias(name: str, doc: str, fun: Callable[_P, _T]) -> Callable[_P, _T]:
# Adapted from
# https://stackoverflow.com/questions/13503079/how-to-create-a-copy-of-a-python-function#
# See also help(type(lambda: 0))
Expand Down
15 changes: 9 additions & 6 deletions reactivex/operators/_flatmap.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio import Future
from typing import Any, Callable, Iterable, Optional, TypeVar, Union, cast
from typing import Any, Callable, Optional, TypeVar, Union, cast

from reactivex import Observable, from_, from_future
from reactivex import operators as ops
from reactivex.internal.basic import identity
from reactivex.typing import Mapper, MapperIndexed

_T1 = TypeVar("_T1")
Expand All @@ -14,14 +15,16 @@ def _flat_map_internal(
mapper: Optional[Mapper[_T1, Any]] = None,
mapper_indexed: Optional[MapperIndexed[_T1, Any]] = None,
) -> Observable[Any]:
def projection(x: _T1, i: int):
mapper_result = (
mapper(x) if mapper else mapper_indexed(x, i) if mapper_indexed else None
def projection(x: _T1, i: int) -> Observable[Any]:
mapper_result: Any = (
mapper(x)
if mapper
else mapper_indexed(x, i)
if mapper_indexed
else identity
)
if isinstance(mapper_result, Future):
result: Observable[Any] = from_future(cast("Future[Any]", mapper_result))
elif isinstance(mapper_result, Iterable):
result = from_(mapper_result)
elif isinstance(mapper_result, Observable):
result = mapper_result
else:
Expand Down

0 comments on commit 9bb8393

Please sign in to comment.