Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TMP TEST 2 #8964

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft

TMP TEST 2 #8964

wants to merge 16 commits into from

Conversation

Nusnus
Copy link
Member

@Nusnus Nusnus commented Apr 14, 2024

!!! DO NOT MERGE !!!

Testing if something works in the CI or not due to using Mac as a local dev machine where it does work.

def create(self, w):
if w.hub2 is None:
required_hub = getattr(w._conninfo, 'requires_hub', None)
w.hub2 = set_event_loop((
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn’t use the set_event_loop because it uses the global. It needs to create a new instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to create a hub per consumer (e.g. from w.consumers)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrate logic into Hub class:

  1. If w.consumer is NOT none, go as usual.
  2. if w.consumers is NOT none, apply same logic for each consumer (to get its own hub (loop) instance).

@@ -137,7 +178,8 @@ def create(self, w):
max_restarts = None
if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover
warnings.warn(UserWarning(W_POOL_SETTING))
threaded = not w.use_eventloop or IS_WINDOWS
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not w.use_eventloop or IS_WINDOWS or app.new_flag is not None

@@ -215,14 +257,16 @@ def create(self, w):
class Consumer(bootsteps.StartStopStep):
"""Bootstep starting the Consumer blueprint."""

requires = ('celery.worker.components:Hub2',)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be needed then

last = True

def create(self, w):
if w.max_concurrency:
prefetch_count = max(w.max_concurrency, 1) * w.prefetch_multiplier
else:
prefetch_count = w.concurrency * w.prefetch_multiplier
c = w.consumer = self.instantiate(
c = w.consumer = [self.instantiate(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If new flag True:

  1. w.consumer = None
  2. w.consumers = List

Else:

  1. w.consumer = Normal usage
  2. w.consumers = None

@@ -181,7 +181,8 @@ def __init__(self, on_task_request,
pool=None, app=None,
timer=None, controller=None, hub=None, amqheartbeat=None,
worker_options=None, disable_rate_limits=False,
initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
initial_prefetch_count=2, prefetch_multiplier=1, url=None, **kwargs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add new parameter to allow passing the url directly when instantiating a new Consumer but use None for the default behavior

@@ -158,7 +158,11 @@ def on_close(self):

def on_stopped(self):
self.timer.stop()
self.consumer.shutdown()
if isinstance(self.consumer, list):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adjusting the w.consumer to be a list, use the new flag to choose which consumer to act upon

@Nusnus Nusnus changed the title TMP TEST TMP TEST 2 Apr 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants