Skip to content

Commit

Permalink
Use deque in ReplaySubject (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
timothy-shields authored and dbrattli committed Jun 9, 2022
1 parent 54eaa94 commit 430cfab
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions reactivex/subject/replaysubject.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from collections import deque
from datetime import datetime, timedelta
from typing import Any, List, NamedTuple, Optional, TypeVar, cast
from typing import Any, Deque, NamedTuple, Optional, TypeVar, cast

from reactivex.observer.scheduledobserver import ScheduledObserver
from reactivex.scheduler import CurrentThreadScheduler
Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(
self.window = (
timedelta.max if window is None else self.scheduler.to_timedelta(window)
)
self.queue: List[QueueItem] = []
self.queue: Deque[QueueItem] = deque()

def _subscribe_core(
self,
Expand Down Expand Up @@ -84,10 +85,10 @@ def _subscribe_core(

def _trim(self, now: datetime) -> None:
while len(self.queue) > self.buffer_size:
self.queue.pop(0)
self.queue.popleft()

while self.queue and (now - self.queue[0].interval) > self.window:
self.queue.pop(0)
self.queue.popleft()

def _on_next_core(self, value: _T) -> None:
"""Notifies all subscribed observers with the value."""
Expand Down

0 comments on commit 430cfab

Please sign in to comment.