Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling repeated unique jobs #457

Open
davidhuser opened this issue May 16, 2024 · 5 comments
Open

Scheduling repeated unique jobs #457

davidhuser opened this issue May 16, 2024 · 5 comments

Comments

@davidhuser
Copy link

davidhuser commented May 16, 2024

Thanks for the great library. Is it possible to schedule repeated unique jobs? My goal is to enqueue a job, ensure only one instance runs at a time, and re-enqueue it immediately after it finishes, possibly with a configurable interval.

I noticed that RQ offers rq-scheduler for this purpose.

Self-enqueuing, as mentioned in #432 might not be ideal.

Currently, I'm using a cron job with the second parameter set to every 10 seconds. However, the job duration varies:

async def repeated_async_job(ctx):
    async with get_session(ctx) as db:
         do_db_stuff(db)
    return 'success!'

class WorkerSettings:
    cron_jobs: list[CronJob] = [
        cron(
            repeated_async_job,
            second=set(range(0, 59, 10)),
            unique=True,
            run_at_startup=False,
            timeout=60,
            max_tries=1,
            keep_result=0
        )
    ]
    redis_settings = RedisSettings()

Would it be better to handle scheduling with a different tool, like APScheduler's AsyncIOScheduler? If so, how would the scheduler know when the job is finished, and would it matter if it runs with multiple workers (e.g. Gunicorn)?

@epicwhale
Copy link

epicwhale commented May 16, 2024

I loosely recollect having to do this in the past... wouldn't defining a job_id for the cron(..) enforce uniqueness at run time? and to re-enqueue it immediately after it finishes, I believe the repeated_async_job(..) wrapper could enqueue it with the same job_id before it returns?

@davidhuser
Copy link
Author

davidhuser commented May 17, 2024

I'd prefer not to mix cron with func although the cron are converted to func. Below example is func-only.

Regarding your job_id suggestion: The first job can/should have a job_id, but all subsequent jobs cannot have the same job_id because it cannot re-enqueue a job within the job because it is not yet finished.

I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)
    
    # re-enqueue if there is exactly one job running (this job)
    redis = ctx['redis']
    queued_jobs = await redis.queued_jobs()
    queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
    if queued_jobs_len == 0:
        print("ERROR: should not happen")
    elif queued_jobs == 1:
        # the current job so we can enqueue the next, but without a job_id
        await redis.enqueue_job('repeated_async_job', _job_try=1)
    else:
        print("ERROR: too many jobs")


async def main():
    redis = await create_pool(RedisSettings())
    # startup job with unique ID
    await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())
    

it seems to work, but not sure if there is a better/native way to do this.

I was also wondering what _job_try really does, in the docs it says:

enqueue_job
_job_try – useful when re-enqueueing jobs within a job

but how is it useful?

@epicwhale
Copy link

Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted).

Regarding _job_try: My understanding is that _job_try is to explicitly set which 'retry attempt' number do you want to treat the enqueued job as, which should be accessible in the ctx['job_try'] in the job, and used by arq wherever job_try is referenced: https://github.com/search?q=repo%3Asamuelcolvin%2Farq+job_try&type=code - I do not think it helps in anyway with the _job_id situation here, to the best of my knowledge.

If you find a better solution, keen to learn too!

@davidhuser
Copy link
Author

davidhuser commented May 18, 2024

since a job can have more states than queued I'm using this check now before enqueuing:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)

    # Check if any job with the same function is deferred, queued, or in progress
    pool = ctx['redis']
    all_jobs = await pool.all_job_results()
    in_progress_jobs = [
        job for job in all_jobs
        if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
        and job.function == 'repeated_async_job'
    ]

    if in_progress_jobs:
        return 'done'

    await pool.enqueue_job('repeated_async_job')
    return 'done'


async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('repeated_async_job')


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())

it does not account for params (i.e. same job but different parameters), but for this job I don't need it.

@epicwhale
Copy link

@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? #459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants