Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run many tasks on one worker #287

Closed
Totktonada opened this issue Apr 2, 2021 · 2 comments · Fixed by #434
Closed

Run many tasks on one worker #287

Totktonada opened this issue Apr 2, 2021 · 2 comments · Fixed by #434
Assignees
Labels
bug Something isn't working
Milestone

Comments

@Totktonada
Copy link
Member

test-run puts tasks into queues at start, but size of those queues are not infinite (AFAIR, it is restricted by size of a pipe buffer).

How to reproduce:

./test/test-run.py -j 1 $(yes box-tap/key_def.test.lua | head -n 10000)
@Totktonada Totktonada added the bug Something isn't working label Apr 2, 2021
@kyukhin kyukhin added the teamQ label Sep 17, 2021
@kyukhin kyukhin added this to the wishlist milestone Sep 17, 2021
@Totktonada
Copy link
Member Author

While refactoring of the input/output queues processing seems a bit complicated, I've created a simple workaround.

diff --git a/lib/worker.py b/lib/worker.py
index db54de9..d7cd479 100644
--- a/lib/worker.py
+++ b/lib/worker.py
@@ -95,12 +95,24 @@ def get_task_groups():
         stable_task_ids = [task.id for task in suite.stable_tests()]
         fragile_task_ids = [task.id for task in suite.fragile_tests()]
         if stable_task_ids:
-            res[key] = {
-                'gen_worker': gen_worker,
-                'task_ids': stable_task_ids,
-                'is_parallel': suite.is_parallel(),
-                'show_reproduce_content': suite.show_reproduce_content(),
-            }
+            if len(stable_task_ids) > 100:
+                group_max_size = 100
+                group_num = 1
+                for i in range(0, len(stable_task_ids), group_max_size):
+                    res[key + '_' + str(group_num)] = {
+                        'gen_worker': gen_worker,
+                        'task_ids': stable_task_ids[i:i+group_max_size],
+                        'is_parallel': suite.is_parallel(),
+                        'show_reproduce_content': suite.show_reproduce_content(),
+                    }
+                    group_num += 1
+            else:
+                res[key] = {
+                    'gen_worker': gen_worker,
+                    'task_ids': stable_task_ids,
+                    'is_parallel': suite.is_parallel(),
+                    'show_reproduce_content': suite.show_reproduce_content(),
+                }
         if fragile_task_ids:
             res[key + '_fragile'] = {
                 'gen_worker': gen_worker,

@Totktonada
Copy link
Member Author

I've found that the problem with blocking a write to the underlying pipe is solved in multiprocessing.Queue (even on Python 2):

When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

SimpleQueue writes to the underlying pipe directly. Queue accumulates incoming data in an intermediate buffer that is flushed from a separate thread in background.

@Totktonada Totktonada self-assigned this May 29, 2024
Totktonada added a commit that referenced this issue May 29, 2024
A task queue dispatcher puts all the tasks to the task queue at startup.
Then workers are started and are taking the tasks from it.

If there are many tasks in a task group (which roughly corresponds to a
test suite), we can reach the pipe buffer size on putting into the
queue, because `multiprocessing.SimpleQueue` uses a pipe under the hood.

The solution is to use `multiprocessing.Queue`, which has an
intermediate buffer before the underlying pipe and writes to the pipe in
a background thread, without blocking a thread that calls
`<queue>.put()`.

The `Queue` API is a superset of the `SimpleQueue` API, so we can just
replace the implementation.

Let's also use `Queue` for the worker's output queue to be on the safe
side and for consistency.

Fixes #287
ylobankov pushed a commit that referenced this issue May 29, 2024
A task queue dispatcher puts all the tasks to the task queue at startup.
Then workers are started and are taking the tasks from it.

If there are many tasks in a task group (which roughly corresponds to a
test suite), we can reach the pipe buffer size on putting into the
queue, because `multiprocessing.SimpleQueue` uses a pipe under the hood.

The solution is to use `multiprocessing.Queue`, which has an
intermediate buffer before the underlying pipe and writes to the pipe in
a background thread, without blocking a thread that calls
`<queue>.put()`.

The `Queue` API is a superset of the `SimpleQueue` API, so we can just
replace the implementation.

Let's also use `Queue` for the worker's output queue to be on the safe
side and for consistency.

Fixes #287
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants