Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future plan for Arq #437

Open
samuelcolvin opened this issue Mar 17, 2024 · 33 comments
Open

Future plan for Arq #437

samuelcolvin opened this issue Mar 17, 2024 · 33 comments

Comments

@samuelcolvin
Copy link
Owner

samuelcolvin commented Mar 17, 2024

Arq was the first real open source project I ever created, back in 2016. That was long before Pydantic, FastAPI, ParamSpec, or even Redis Streams.

I remember a sense of incredulity that I couldn't find an async variant of rq (which I was helping to maintain at the time), surely I wasn't the only person wanting to queue jobs in async code? Apparently at the time I was.

Fast forward eight years, and I'm definitely not the only person trying to queue jobs in an async world.

Hence my incredulity has only grown - there's still no ubiquitous queuing library for async Python, and despite neglect, Arq still seems to work well for lots of people, I've used it in every role I've had since, and for the most part it just works.


That said, Arq needs some love, and since we're now using it at Pydantic, I think we should have the resources to provide that love later this year. This is a rough plan of what I propose to do.

Feedback very welcome from all, but especially @JonasKs and @pydantic-maintainers (who apparently I can't tag here :-()

In summary I want to significantly refactor the internals, and update the API in a backwards compatible way.

1. ParamSpec and type safety 🚧

The most important change we should make is to make Arq typesafe using ParamSpec and Concatenate, I have a partially working demonstration of how this will work below.

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

We should be able to do this while still supporting the current API to make migration as easy as possible.

2. Redis Streams 🚀

The second most important change is to adopt Redis streams which read like they were designed for exactly this application, they mean we can effectively guarantee only one execution while still being resilient to unexpected shutdown (Jobs shutdown during execution will be rerun later).

This should be possible without breaking the current API at all.

3. Avoid sorted set for immediate jobs 🚀

Current Arq is slower than it should be because it uses a sorted set to queue all jobs, the sorted set provides two things:

  • a way to avoid loosing jobs that are cancelled during shutdown or fail - this should be solved with streams (see above)
  • a way to defer jobs so they're run in the future

The idea would be to only use the sorted set for jobs scheduled to run in the future, then use the logic demonstrated by SAQ to take jobs off the sorted set when they're ready to be run and add them to the stream.

Jobs which are enqueued without a delay can be added to the stream immediately, which should significantly improve performance for this very common case.

This should be possible without breaking the current API at all.

4. Avoid polling 🇵🇱

Mostly for latency reasons it would be nice to avoid polling, the idea would be:

  • the XREADGROUP with BLOCK on streams mean we'll no longer need to poll for the next jobs in the worker
  • we can use pubsub to notify waiting code of job results to avoid polling there

5. OpenTelemetry 🔭

Observability is close to our hearts at Pydantic, so it would be nice to have optional support for OpenTelemetry, or perhaps just hook points to implement your own observability.

This should be possible without breaking the current API at all.

6. DAG - Task Dependency Graph 📈

The idea is to allow one or more jobs to be triggered by one or more previous jobs.

See the then() and start_with() methods in the partial implementation below.

This should be possible without breaking the current API at all.

7. CLI, settings and worker.run improvements 🏃

We can mostly just copy uvicorn, we should also remove the very ugly WorkerSettings and configure the worker via simple function.

We should also fix reload logic to use watchfiles.

This can be done such that existing code still works, with or without deprecation warnings.

8. Separate the backend ↔️

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

This can be done such that existing code still works, with or without deprecation warnings.

9. Better documentation 📖

Documentation should move to mkdocs material and mkdocstrings, and be improved significantly.

10. Moving repo to Pydantic 🏢

To provide the resources for this work, we should move Arq to the Pydantic organization, and the docs to arq.pydantic.dev or similar.

Have I missed anything?


API Sketch

Here's a sketch of how I see the new type-safe API working, together with a partial implementation:

Example Usage:

from __future__ import annotations as _annotations

from dataclasses import dataclass
from typing import AsyncIterator
from contextlib import asynccontextmanager

from arq import FunctionContext, WorkerApp

from httpx import AsyncClient


@dataclass
class MyWorkerDeps:
    """
    Type safe way of defining the dependencies of the worker functions.

    E.g. HTTP client, database connection, settings. etc.
    """
    http_client: AsyncClient


@asynccontextmanager
async def my_worker_lifespan() -> AsyncIterator[MyWorkerDeps]:
    async with AsyncClient() as http_client:
        yield MyWorkerDeps(http_client)


worker_app = WorkerApp(lifespan=my_worker_lifespan)


@worker_app.register
async def foo(ctx: FunctionContext[MyWorkerDeps], url: str) -> int:
    # ctx.deps here is of type MyWorkerDeps, that's enforced by static typing
    # FunctionContext will also provide access to a redis connection, retry count,
    # even results of other jobs etc.
    r = await ctx.deps.http_client.get(url)
    r.raise_for_status()
    print(f'{url}: {r.text[:80]!r}...')
    return len(r.text)


async def main() -> None:
    async with worker_app:
        # these two are equivalent, param spec means the arguments are type safe
        await foo.enqueue('https://microsoft.com').start()
        await foo.enqueue('https://microsoft.com')
        # same, delayed by 10 seconds with 5 second timeout
        await foo.enqueue('https://microsoft.com').start(delay_by=10, timeout=5)

        # call foo directly in the same process
        print('length:', await foo.direct('https://github.com'))


if __name__ == '__main__':
    import asyncio

    asyncio.run(main())
Partial Implementation
from __future__ import annotations as _annotations

from dataclasses import dataclass, KW_ONLY
from datetime import timedelta
from typing import ParamSpec, TypeVar, Generic, Concatenate, overload, Self, Any, AsyncIterator, Generator
from collections.abc import Awaitable, Callable
from contextlib import asynccontextmanager, AbstractAsyncContextManager

from arq.connections import RedisSettings

WD = TypeVar('WD')  # worker dependencies
P = ParamSpec('P')  # worker function parameters
R = TypeVar('R')  # worker function return type
PNext = ParamSpec('PNext')
RNext = TypeVar('RNext')


@dataclass
class Job(Generic[R]):
    """
    Represents a job that has been enqueued, could be deferred, queued, running or finished.
    """
    id: str

    async def status(self) -> str: ...

    async def result(self) -> R: ...


@dataclass
class PendingJob(Generic[WD, P, R]):
    """
    Represents a job that has not been enqueued yet.
    """

    deferred_deps: Callable[[], WD]
    func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]
    args: tuple[Any, ...]
    kwargs: dict[str, Any]
    _: KW_ONLY
    timeout: timedelta | int | None = None

    def __await__(self) -> Generator[None, None, Job]:
        return self.start().__await__()

    async def start(self, *, delay_by: timedelta | int | None = None) -> Job:
        # also other kwargs delay_until, timeout, retry etc.
        print(f'starting job {self.func.__name__}(args={self.args}, kwargs={self.kwargs}) {delay_by=}')
        return Job(id='123')

    def then(self,
            *on_success: WorkerFunction[WD, PNext, RNext] | PendingJob[WD, PNext, RNext]
        ) -> PendingJob[WD, PNext, RNext]:
        """
        also takes all kwargs from `start()`

        TODO - AFAIK there's no way to enforce that `PNext` is `(R,)` - e.g. that the return value of the
        first function is the input to the second function.

        The even more complex case is where you have multiple jobs triggering a job, where I'm even more sure
        full type safety is impossible.

        I would therefore suggest that subsequent jobs are not allowed to take any arguments, and instead
        access the results of previous jobs via `FunctionContext`
        """
        ...

    def start_with(self, *also_start: WorkerFunction[WD, P, R]) -> WorkerFunction[WD, P, R]:
        # also takes all kwargs from `start()`, I think this can be entirely type safe
        ...


@dataclass
class WorkerFunction(Generic[WD, P, R]):
    deferred_deps: Callable[[], WD]
    func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]
    timeout: timedelta | int | None = None

    def enqueue(self, *args: P.args, **kwargs: P.kwargs) -> PendingJob[WD, P, R]:
        return PendingJob(self.deferred_deps, self.func, args, kwargs, timeout=self.timeout)

    async def direct(self, *args: P.args, **kwargs: P.kwargs) -> R:
        return await self.func(FunctionContext(self.deferred_deps()), *args, **kwargs)


@dataclass
class FunctionContext(Generic[WD]):
    """
    Context provided to worker functions, contains deps but also a connection, retry count etc.
    """

    deps: WD


@asynccontextmanager
async def none_lifespan() -> AsyncIterator[None]:
    yield None


Unset = object()


class WorkerApp(Generic[WD]):
    def __init__(
        self,
        *,
        redis_settings: RedisSettings | str = 'redis://localhost',
        lifespan: Callable[[], AbstractAsyncContextManager[WD]] = none_lifespan,
    ):
        self.redis_settings = redis_settings
        self.lifespan = lifespan()
        self.deps: WD | Unset = Unset  # type: ignore

    @overload
    def register(self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]], /) -> WorkerFunction[WD, P, R]: ...

    @overload
    def register(
        self, *, timeout: timedelta | int
    ) -> Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]: ...

    def register(
        self, func: Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]] | None = None, *, timeout: timedelta | int | None = None
    ) -> WorkerFunction[WD, P, R] | Callable[[Callable[Concatenate[FunctionContext[WD], P], Awaitable[R]]], WorkerFunction[WD, P, R]]:
        if func is None:
            return lambda func: WorkerFunction(self._deferred_deps, func)
        return WorkerFunction(self._deferred_deps, func, timeout=timeout)

    def _deferred_deps(self) -> WD:
        if self.deps is Unset:
            raise RuntimeError('WorkerApp is not started')
        return self.deps

    async def startup(self) -> None:
        self.deps = await self.lifespan.__aenter__()

    async def shutdown(self, *args) -> None:
        await self.lifespan.__aexit__(*args)

    async def __aenter__(self) -> Self:
        await self.startup()
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self.shutdown(*args)

    async def run_queued_jobs(self) -> list[Any]:
        """
        Run jobs already in the queue, useful for testing.

        Returns a list of jobs results.
        """
        ...
@ddanier
Copy link
Sponsor

ddanier commented Mar 17, 2024

Great to hear all of that. I would very much like arq to actually get better and better. We are using arq in many of our projects.

One additional thing might be to allow some kind of locking for the jobs. I could image this to work like the ressource_group in GitLab (see https://docs.gitlab.com/ee/ci/resource_groups/) which allows only one job to run for each ressource group. This could allow users to easily ensure no two or more jobs try to change the same thing and by doing so solve many of the race conditions one could have when using async tasks.

@samuelcolvin
Copy link
Owner Author

That's very easily done in redis, I guess no reason for us not to add a utility.

@tiangolo
Copy link
Sponsor

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

I love this part! That's the main reason I had gone for Celery over RQ and others (many years ago, long before FastAPI). That required weird tricks in how to define the Celery code. Having something that takes this into account first-class sounds amazing.


It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync()). I think this would simplify migrating sync codebases, or adopting it for mainly sync codebases.

@dhruv-ahuja
Copy link

It would be great if it could also support regular def functions on top of async functions, like Starlette/FastAPI, running them on a thread worker (anyio.to_thread.run_sync())

I support this statement heavily. We have a synchronous codebase at work for a current project requirement using Fastapi. We had to go with rq for the queuing solution (Celery was a bit too heavy for our use case), which works great but I would have loved to go for arq to ensure future proofing with async support, but this wasn't possible.

@Kludex
Copy link
Sponsor Collaborator

Kludex commented Mar 18, 2024

I would like to see automatic documentation with AsyncAPI.

@SlavaSkvortsov
Copy link

Thank you very much for working on arq - it's a great project, we've been using it a lot and I'm happy it's getting some love <3

It would be great to have an out of the box priorities for the jobs. Currently, we're using this solution. We can't adapt the approach with different queues because we need to have the same limited number of concurrent jobs and sometimes execute some urgent jobs ahead of the others

@samuelcolvin
Copy link
Owner Author

I think you can run stream consumers on different streams with different priorities, so that should be possible.

@rednafi
Copy link

rednafi commented Mar 18, 2024

One thing I’d love to see is first party support for a UI to see job status, worker conditions, ability to start or stop enqueued jobs, search jobs, filter jobs by workers, delete jobs, etc.

Maybe FastUI can be used to build a robust solution for that.

A lot of the time, using Celery-like tools is a pain because there’s no well supported first-party UI for monitoring and taking interactive actions. If arq can tackle that status quo, it’ll be awesome.

@frankie567
Copy link

Thank you @samuelcolvin for the renewed efforts on this project 🙏 As @birkjernstrom mentioned on Twitter, we use Arq as our worker backend for Polar.

One of the aspect I would love to see is a middleware-like feature. Most of the time, we have logic/behaviors to execute before/after a job is run (logging, context management...). In the current version, we have access to on_job_start and on_job_end hooks, but one of the downside is that they don't give access to the task arguments.

Currently, we workaround this by implementing decorators we apply on our worker tasks: https://github.com/polarsource/polar/blob/98f4696e95755e93019b0c657c6b08dff64ea02a/server/polar/worker.py#L150-L189

A middleware approach would be very neat and allow us to wrap any kind of logic without having to touch the tasks themselves, similar to what we can do with web frameworks like Starlette.

@samuelcolvin
Copy link
Owner Author

Yup, totally agree - middleware makes lots of sense and should be fairly easy to implement compared to some of the other stuff here.

@samuelcolvin
Copy link
Owner Author

@rednafi I agree on dashboards, they're something we're thinking about lots at Pydantic.

@epicwhale
Copy link

Really thanks for the renewed interest and roadmap! :)

My Wishlist:

  • Better IDE / type hinting. I tend to have a bunch of # type: ignore around my arq codebase.
  • Unique Jobs (without using keep_result = 0): a straight forward way to prevent queueing a job if it's a) already in the queue or b) under execution. But OKAY to queue it if it's 'completed' or has a result saved in redis. This is useful for the Cron tasks actually (say I have a job that I want to run every 5 mins). Right now I have to use keep_result = 0, as a work around or find a way to clear the job result before it can be queued again. There's also more context on this confusion (and suggested documentation) in Cannot enqueue job with a _job_id again, despite execution of the job being fully completed? #432
  • An easy way to invoke a job during development via CLI, with easy VS Code debug support. Right now I use jupyter notebooks to trigger a job during development.
  • Starting a Worker without it executing the cron_jobs registered inside it. This is useful during development, where I'm working on a single job - but I don't want the crons to run in the midst of my development / debugging.

@samuelcolvin
Copy link
Owner Author

  • Better IDE / type hinting. I tend to have a bunch of # type: ignore around my arq codebase.

Agreed, I think that's covered above - I think we can even do that for jobs where the worker code is not in the search path of the queuing logic.

  • Unique Jobs (without using keep_result = 0): a straight forward way to prevent queueing a job if it's a) already in the queue or b) under execution. But OKAY to queue it if it's 'completed' or has a result saved in redis. This is useful for the Cron tasks actually (say I have a job that I want to run every 5 mins). Right now I have to use keep_result = 0, as a work around or find a way to clear the job result before it can be queued again. There's also more context on this confusion (and suggested documentation) in Cannot enqueue job with a _job_id again, despite execution of the job being fully completed? #432

Makes sense, I think that's usable.

  • An easy way to invoke a job during development via CLI, with easy VS Code debug support. Right now I use jupyter notebooks to trigger a job during development.

But how would you define the arguments? I guess we could either define them via a python file, or as JSON in the command line? Maybe you could create a separate issue to discus this?

  • Starting a Worker without it executing the cron_jobs registered inside it. This is useful during development, where I'm working on a single job - but I don't want the crons to run in the midst of my development / debugging.

Yup, seems pretty easy - I guess we allow you to define separate WorkerApps (see the example above), then run just one of them, with that you could put all your cron jobs in a app and not run that during development.

@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Mar 18, 2024

I'm really, really excited about this! Thanks for all the effort throughout the years @samuelcolvin, and to everyone helping out in issues or with PRs.

I think you're pretty spot on with these ideas, in addition to what @frankie567 writes about the middleware. I'd also like to add that while we have queues today, the queue name is not used for e.g. storage of results, causing confusing behavior.

I'm also very much in favor of OpenTelemetry + a dashboard/API (#297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

As for the API - it looks really clean. 👏

@Blaise-93
Copy link

Thank you Colvin for trying to boost strap this project, it is really a great project. I would love to see it have a documentation. Async worker ought not to be feature rich like celery, we need library that is simple and does the work effectively and efficiently.

@epicwhale
Copy link

Few more:

  • an easy to way to delete the result of the job (from redis) using a job_id string. This is so I can reuse the _job_id to enqueue the job again.
  • When a job fails due to an error, it should respect the keep_result configuration of the job definition (from the enqueue_job call or cron job) from where it was invoked. At the moment it looks like when a job fails, its results get saved - a bug reported by someone else too: Use function's keep_results configuration when storing failed jobs results #433

@KShah707
Copy link

Would be awesome if you could first skim through the backlog of approved / tested PRs waiting for merge @samuelcolvin. There are some really simple but high-value ones like #378 that would probably help a lot of people.

@samuelcolvin
Copy link
Owner Author

There are some really simple but high-value ones like #378 that would probably help a lot of people.

Done. I'm working through PRs now.

@samuelcolvin
Copy link
Owner Author

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

@iamlikeme
Copy link
Contributor

Job uniqueness in arq is opt-in, i.e. you opt-in by crafting deterministic job ids. In my team we heavily rely on job uniqueness (in a huge majority of cases we do not want concurrent runs of the same worker function with the same arguments) - to the extent that we wanted it to be opt-out. We accomplish this by generating default job_ids, something along these lines:

@worker_app.register(
    job_id="foo:{a}"  # template string, may refer to arguments passed to the worker function
)
async def foo(ctx: FunctionContext, a: str, b: int) -> int:
    ...

I think this is simple and generally useful. I'm not sure if this is the right thread for this sort of feature requests, so sorry if that is not appropriate here.

[@JonasKs ] I'm also very much in favor of OpenTelemetry + a dashboard/API (#297), though I don't know if a dashboard nessescarily needs to be shipped with arq.

I wholeheartedly agree. I've once made an attempt to make a generic dashboard for arq and I found it problematic that pickle is used as the default serializer, because unpickling is reliable only in an environment where the same version of arq is installed. If default serialization was e.g. to JSON it would be much easier to write a dashboard that can be shipped separately from arq.

@cmangla
Copy link
Contributor

cmangla commented Apr 3, 2024

@samuelcolvin Many thanks for creating arq and for leading the project over the years.

I am glad to see the wide-ranging uses arq finds and your plan for new features and refinements. However, I do hope that arq remains relatively light-weight and easy to get started with. I used it for the first time during an internship, when I was short on time. The documentation for arq is concise and thorough, and it has few moving parts. I hope that we can, broadly, maintain these qualities.

@cmangla
Copy link
Contributor

cmangla commented Apr 3, 2024

In summary I want to significantly refactor the internals

I have an offhand suggestion. In cPython 3.13, we will get a Python API for "Multiple Interpreters" as per PEP 734. This will allow us to launch interpreters in separate OS threads, in a single OS process. The interpreters don't share a GIL, so that gives us multi-core parallelism. It should be possible for arq to use this facility to launch multiple workers, each in a separate interpreter OS thread, in a single process. Worker would then have the same multi-core parallelism that they otherwise have in separate processes. The advantage would be memory saving. For example, in an ML application, if each worker needs to pre-precess the same dataset and keep it resident in memory, that could be a common global object between workers on the same machine.

We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.

Another offhand suggestion. For the testing backend, we could use PEP 734 to launch a worker and job-submitter in the same process in separate interpreters, then use the inter-interpreter sharing facilities of PEP 734 for a rudimentary in-memory backend.

@waza-ari
Copy link

waza-ari commented May 1, 2024

First of all, thanks for the project and dedication! It's taken for granted quite often, but it takes a lot of time to maintain OS projects.

One suggestion would be custom exception handlers, similar to what FastAPI offers. We're exclusively using structlog for JSON based log output, and custom error handlers would help in:

  • Handling failed jobs gracefully
  • Be able to serialise the error message into JSON, such that they're properly handled by structlog
  • Potentially even achieve a job specific retry behaviour, although I don't see any immediate need for this one

@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented May 1, 2024

We should also try to make health checks to be k8s-friendly. The health check for the worker should probably only check whether the worker can connect to the backend and query the task queue, and not be bound to tasks being executed.

Today, a health check is recorded only if:

  • it's time to record a health check
  • after a batch of jobs has been completed

This setup fails if:

  • pod starts
  • jobs start
  • liveness probe polls, no health check has been recorded (arq --check)
  • SIGTERM is sent to the pod
  • job cancels

We found this behavior this week, where our setup is:

  • max_jobs 1
  • job queued, job takes ~30m to finish
  • liveness probe fails
  • pod shuts down
  • repeat forever

@espdev
Copy link

espdev commented May 1, 2024

@samuelcolvin
Thank you

v0.26.0b1 is released, please try it, I'll release v0.26 at the end of the week, see #441.

What are the plans for the release?

@samuelcolvin
Copy link
Owner Author

Oh 🤦 , I'll do the release today.

So sorry, all I can say is, it's been a busy few weeks.

@Wh1isper
Copy link

Wh1isper commented May 6, 2024

Having just discovered this very good project! Thank you!

In my scenario, I would prioritize the following two needs:

Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring

@gaby
Copy link

gaby commented May 7, 2024

How does this compare to something like FastStream (formerly Propan) ?

https://github.com/airtai/faststream

@espdev
Copy link

espdev commented May 7, 2024

@gaby

They are different tools for different purposes and applications.

  • arq is a library for job queuing to run background and scheduled tasks
  • FastStream is a pub/sub framework to build asynchronous services with event-driven architecture

@Graeme22
Copy link

Another thing that would be nice to have: cron jobs that accept typical cron patterns (eg "*/5", "9-13"), as the sets are a bit repetitive.

@evgenii-moriakhin
Copy link

Very excited that you want to improve this library. For asynchronous python, such simple task execution libraries are not enough (and you can't deploy giants like Temporal in any project)

@Wh1isper
Copy link

Wh1isper commented May 19, 2024

Having just discovered this very good project! Thank you!

In my scenario, I would prioritize the following two needs:

Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring

Inspired by this project, I developed brq for my own needs based on redis stream(a simple task queue library that can be used with a redis cluster without the watch/pipeline command).

@Lancetnik
Copy link

Lancetnik commented May 27, 2024

@gaby

They are different tools for different purposes and applications.

  • arq is a library for job queuing to run background and scheduled tasks
  • FastStream is a pub/sub framework to build asynchronous services with event-driven architecture

But now we are talking about

We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.

I would like to see automatic documentation with AsyncAPI.

Separate the backend

These things are makes the ARQ the same thing with FastStream, so why we should make it? If we are talking about the tool for asynchronous services - we already have the one. If we are talking about tasks framework - arq is a good choice. Should we mixed these cases?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests