Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job aborts failing #405

Open
rm-21 opened this issue May 31, 2023 · 5 comments
Open

Job aborts failing #405

rm-21 opened this issue May 31, 2023 · 5 comments

Comments

@rm-21
Copy link
Contributor

rm-21 commented May 31, 2023

I have setup a sample project: arq-scale-test. I love Arq but somehow task abort is failing for me. Any help or insights would be appreciated.

I have a simple long running tasks that sleeps for 60 seconds.
5 tasks are queued and started in app.py.
In checks.py, I attempt to check the status of the task and stop it. I am successfully able to retrieve the task status. But while calling await job.abort(), it just hangs. The task actually gets completed within 60 seconds & then I get a job aborted as False.

Adding the code below as well:

# config.py
from typing import Any

from arq.connections import RedisSettings
from pydantic import BaseSettings
from pydantic import validator


class Settings(BaseSettings):
    ENVIRONMENT: str | None

    # Redis backend
    REDIS_HOST: str
    REDIS_PORT: str

    # ARQ Config
    ARQ_REDIS_SETTINGS: RedisSettings | None = None
    ARQ_MAX_JOBS: int = 5
    ARQ_JOB_TIMEOUT: int = 9 * 60 * 60  # 9hrs in seconds
    ARQ_KEEP_RESULT: int = 9 * 60 * 60  # 9hrs in seconds
    ARQ_MAX_TRIES: int = 1
    ARQ_HEALTH_CHECK_INTERVAL: int = 5 * 60  # 5 minutes in seconds
    ARQ_RETRY_JOBS: bool = False
    ARQ_ALLOW_ABORT_JOBS: bool = True

    @validator("ARQ_REDIS_SETTINGS")
    @classmethod
    def assemble_arq_redis_settings(
        cls, value: RedisSettings | None, values: dict[str, Any]
    ) -> RedisSettings:
        if isinstance(value, RedisSettings):
            return value
        settings = RedisSettings(
            host=values["REDIS_HOST"],
            port=values["REDIS_PORT"],
            database=1,
        )
        return settings

    class Config(BaseSettings.Config):
        case_sensitive: bool = True
        env_file: str = ".env"


def get_settings() -> Settings:
    return Settings()


settings: Settings = get_settings()
# arq_app.py
import asyncio
from typing import Any

from .config import settings


async def long_running_func(
    ctx: dict[Any, Any] | None,
) -> None:
    await asyncio.sleep(60)


def worker_dispatcher():
    class ArqWorker:
        # Functions to schedule
        functions = [long_running_func]

        # ARQ settings
        redis_settings = settings.ARQ_REDIS_SETTINGS
        max_jobs = settings.ARQ_MAX_JOBS
        job_timeout = settings.ARQ_JOB_TIMEOUT
        keep_result = settings.ARQ_KEEP_RESULT
        max_tries = settings.ARQ_MAX_TRIES
        health_check_interval = settings.ARQ_HEALTH_CHECK_INTERVAL
        retry_jobs = settings.ARQ_RETRY_JOBS
        allow_abort_jobs = settings.ARQ_ALLOW_ABORT_JOBS

    return ArqWorker


worker0 = worker_dispatcher()
worker1 = worker_dispatcher()
worker2 = worker_dispatcher()
worker3 = worker_dispatcher()
worker4 = worker_dispatcher()
# app.py
import asyncio

from arq import create_pool

from .config import settings


async def create_jobs(num: int) -> None:
    redis = await create_pool(settings.ARQ_REDIS_SETTINGS)

    for i in range(num):
        job = await redis.enqueue_job("long_running_func")
        assert job is not None
        print(f"{i + 1}: Job ID: {job.job_id} | {await job.status()}")


if __name__ == "__main__":
    asyncio.run(create_jobs(5))
# checks.py
import asyncio

from arq import create_pool
from arq.jobs import Job

from .config import settings


async def job_status(task_id: str, stop: bool = False) -> None:
    job: Job = Job(
        job_id=task_id,
        redis=await create_pool(settings_=settings.ARQ_REDIS_SETTINGS),
    )

    print(f"ID {task_id}: {await job.status()}")

    if stop:
        result: bool = await job.abort()
        print(f"ID {task_id}: Aborted: {result}")


async def main() -> None:
    await job_status(task_id="6b3fa6d3e4a34c0c92ac56687c1e77bd", stop=True)


if __name__ == "__main__":
    asyncio.run(main())
@rm-21
Copy link
Contributor Author

rm-21 commented Jun 5, 2023

I might have identified the potential issue.

it has to do with self.sem. Within the _poll_iteration function, among other things, the jobs are being started within the if self.allow_pick_jobs: block. Once the semaphore is filled, it blocks the code until a task finishes. This block is applicable to the entire MainProcess/MainThread, a particular Arq Worker runs in.

That means if I have 5 tasks for a worker, and I schedule all 5, then the following won't happen unless a tasks gets completed:

  • Checking & aborting jobs
  • Heartbeat check

Currently, the easiest way would be to have a hold of task during the enqueue_job call, with queue_names and schedule a total of max_jobs - 1, tasks.

What could be a potential better solution?

Also, is there something else I might be missing?

@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Jun 5, 2023

Thanks mate.
I think these should run regardless, so a allowlist of tasks that should be allowed, maybe?

I'm a bit AFK for a few weeks, due to IRL stuff.

@rm-21
Copy link
Contributor Author

rm-21 commented Jun 6, 2023

No worries @JonasKs. Take care.
And yep. That could work. But I'll try & debug the issue more closely this week. Will post here if I find a good solution. Is there any Contribution guide? I couldn't find one in the project.

@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented Jun 6, 2023

Sounds good 😊

No, not really. In short, you need a non-password protected redis running on the default port for tests to run, and pre-commit installed.

Clone project, write make install, then pre-commit install. You can see all commands available (such as make test and make lint) in the Makefile.

If you use docker, this docker-compose will do:

version: '3.8'

services:
  redis:
    container_name: arq_redis
    image: redis:7-alpine
    ports:
      - '127.0.0.1:6379:6379'
    restart: always

@rm-21
Copy link
Contributor Author

rm-21 commented Jun 10, 2023

I have attempted to try & solve the issue. Please let me know if there is any clarity or further information needed. Can brainstorm the problem further in case of a possible better solution.

P.S.: Suggestions for PR improvement are also welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants