From bbceaef21b374cba23a0274fad204f9352d7a850 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:37:14 +0000 Subject: [PATCH] revert --- src/lightning_app/components/serve/auto_scaler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 36ae720ae8943..6027249de850f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -185,13 +185,15 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): async def consumer(self): while True: await asyncio.sleep(0.05) + batch = self._batch[: self.max_batch_size] - is_batch_ready = len(batch) == self.max_batch_size - is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching - if batch and (is_batch_ready or is_batch_timeout): + while batch and ( + (len(batch) == self.max_batch_size) or ((time.time() - self._last_batch_sent) > self.timeout_batching) + ): asyncio.create_task(self.send_batch(batch)) - # resetting the batch array, TODO - not locking the array - self._batch = self._batch[len(batch) :] + + self._batch = self._batch[self.max_batch_size :] + batch = self._batch[: self.max_batch_size] self._last_batch_sent = time.time() async def process_request(self, data: BaseModel):