Skip to content

Commit

Permalink
[Framework] Port 7567 bug ocean integration is being terminated due t…
Browse files Browse the repository at this point in the history
…o OOM (#528)

# Description

What - ocean integration is being terminated due to OOM whene dealing
with large amount of entities to process
Why - With asyncio.gather, all items are gathered into a single list
before being processed, potentially causing memory issues if the list is
too large, while asyncio.queue doesn't require holding all items in
memory at once.

## Type of change

Please leave one option from the following and delete the rest:

- [X] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
omby8888 committed May 12, 2024
1 parent 30120bc commit 98cf927
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 38 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.18 (2024-05-12)

### Improvements

- Added a util function that allows to run multiple asynchronous tasks in a bounded way to prevent overload and memory issues
- Use that utility when calculating JQ mapping for raw entities



## 0.5.17 (2024-05-01)

### Bug Fixes
Expand Down
25 changes: 11 additions & 14 deletions port_ocean/core/handlers/entity_processor/jq_entity_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
EntitySelectorDiff,
)
from port_ocean.exceptions.core import EntityProcessorException
from port_ocean.utils.queue_utils import process_in_queue


class JQEntityProcessor(BaseEntityProcessor):
Expand Down Expand Up @@ -128,23 +129,19 @@ async def _parse_items(
raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict(
exclude_unset=True
)
calculate_entities_tasks = [
asyncio.create_task(
self._calculate_entity(
data,
raw_entity_mappings,
mapping.port.items_to_parse,
mapping.selector.query,
parse_all,
)
)
for data in raw_results
]
calculate_entities_results = await asyncio.gather(*calculate_entities_tasks)

calculated_entities_results = await process_in_queue(
raw_results,
self._calculate_entity,
raw_entity_mappings,
mapping.port.items_to_parse,
mapping.selector.query,
parse_all,
)

passed_entities = []
failed_entities = []
for entities_results in calculate_entities_results:
for entities_results in calculated_entities_results:
for entity, did_entity_pass_selector in entities_results:
if entity.get("identifier") and entity.get("blueprint"):
parsed_entity = Entity.parse_obj(entity)
Expand Down
50 changes: 27 additions & 23 deletions port_ocean/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,30 @@ def get_port_diff(
before: Iterable[Entity],
after: Iterable[Entity],
) -> EntityPortDiff:
return EntityPortDiff(
deleted=get_unique(
[
item
for item in before
if not any(is_same_entity(item, item_after) for item_after in after)
],
),
created=get_unique(
[
item
for item in after
if not any(is_same_entity(item, item_before) for item_before in before)
],
),
modified=get_unique(
[
item
for item in after
if any(is_same_entity(item, item_before) for item_before in before)
],
),
)
before_dict = {}
after_dict = {}
created = []
modified = []
deleted = []

# Create dictionaries for before and after lists
for entity in before:
key = (entity.identifier, entity.blueprint)
before_dict[key] = entity

for entity in after:
key = (entity.identifier, entity.blueprint)
after_dict[key] = entity

# Find created, modified, and deleted objects
for key, obj in after_dict.items():
if key not in before_dict:
created.append(obj)
else:
modified.append(obj)

for key, obj in before_dict.items():
if key not in after_dict:
deleted.append(obj)

return EntityPortDiff(created=created, modified=modified, deleted=deleted)
80 changes: 80 additions & 0 deletions port_ocean/utils/queue_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import asyncio
from asyncio import Queue, Task
from typing import Any, TypeVar, Callable, Coroutine

from loguru import logger

T = TypeVar("T")


async def _start_processor_worker(
queue: Queue[Any | None],
func: Callable[..., Coroutine[Any, Any, T]],
results: list[T],
) -> None:
while True:
raw_params = await queue.get()
try:
if raw_params is None:
return
logger.debug(f"Processing {raw_params[0]}")
results.append(await func(*raw_params))
finally:
queue.task_done()


async def process_in_queue(
objects_to_process: list[Any],
func: Callable[..., Coroutine[Any, Any, T]],
*args: Any,
concurrency: int = 50,
) -> list[T]:
"""
This function executes multiple asynchronous tasks in a bounded way
(e.g. having 200 tasks to execute, while running only 20 concurrently),
to prevent overload and memory issues when dealing with large sets of data and tasks.
Usage:
```python
async def incrementBy(num: int, increment_by: int) -> int:
await asyncio.sleep(3)
return num + increment_by
async def main():
raw_objects = [1, 2, 3, 4, 5]
processed_objects = await process_in_queue(
raw_objects,
incrementBy,
5
)
```
:param objects_to_process: A list of the raw objects to process
:param func: An async function that turns raw object into result object
:param args: Static arguments to pass to the func when called
:param concurrency: An integer specifying the concurrent workers count
:return: A list of result objects
"""
queue: Queue[Any | None] = Queue(maxsize=concurrency * 2)
tasks: list[Task[Any]] = []
processing_results: list[T] = []

for i in range(concurrency):
tasks.append(
asyncio.create_task(
_start_processor_worker(queue, func, processing_results)
)
)

for i in range(len(objects_to_process)):
await queue.put((objects_to_process[i], *args))

for i in range(concurrency):
# We put None value into the queue, so the workers will know that we
# are done sending more input and they can terminate
await queue.put(None)

await queue.join()
await asyncio.gather(*tasks)

return processing_results
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.5.17"
version = "0.5.18"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 98cf927

Please sign in to comment.