Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix airflow tasks run --local when dags_folder differs from that of processor #26509

Merged
merged 9 commits into from Sep 28, 2022

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Sep 19, 2022

Currently, if your dags_folder differs from that used by the dag processor, then task_run --local will fail to find the dag, because it uses the fileloc value stored in the serialized dag.

To resolve this issue, I add the dags_folder as configured in the dag processor (or whatever process does the serializing) and this allows us to correctly determine the relative path even when the current dags folder is different.

depends on #26536 getting merged first

@dstandish dstandish force-pushed the store-relative-loc-in-db branch 5 times, most recently from b1416b8 to ddb9f9b Compare September 21, 2022 18:24
@dstandish dstandish changed the title WIP - Fix airflow tasks run --local when dags_folder differs from that of processor Fix airflow tasks run --local when dags_folder differs from that of processor Sep 21, 2022
@dstandish dstandish marked this pull request as ready for review September 21, 2022 22:42
@potiuk
Copy link
Member

potiuk commented Sep 22, 2022

Needs reabase :). But looks good. @mhenc?

except ValueError:
# Not relative to DAGS_FOLDER.
return path

@property
def dag_processor_dags_folder(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How does it apply to processor_subdir field from e.g.DagModel?
Doesn't it have the same meaning?
Maybe we should keep the name consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... that's a good question.... I don't know if it has the same meaning. Is it correct to assume that processor_subdir will always be the dags folder of the processor? If so, why didn't they call it dags folder in the first place? What if there is sometimes an optimization to pass the actual dag file path as subdir to the dag processor so that it will just load that specific file instead of all dags in the dags folder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... i think processor_subdir is something different. I think that the idea is you can have multiple dag processors running, each looking at a different subdir, which is a subdir of dags_folder. that sound right to you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the looks of it, that's why e.g. you have a --subdir option in airflow dags-processor command (which can be different from DAGS_FOLDER)
and then you have this logic
https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L508-L509
this logic appears to say, "if i am a standalone dags processor, and i didn't find the dags in my path, then mark them as stale"

it does seem a bit weird though because then maybe if you change the dags processor subdirs later, perhaps some of those dags would never be returned by this query and therefore would never be deactivated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i guess you would know better than most ;)
image
let me know your thoughts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that name may be a little confusing, but --subdir just points to the dag folder - the value can be absolute path and two different processors may point ot complete different locations.

what do you mean "just points to the dag folder"? i have looked at the code and it does not seem that there's any logic e.g. to make the subdir agree with configured dags folder. e.g. setting subdir on dag processor command does not force dags_folder to agree. so fundamentally, they are two different things right? the dag processor can run without them agreeing, no?

if they are in completely different locations, i.e. not a "subdir" of the dags folder, then the concept of "relative location" of a dag file becomes kind of meaningless, and if you can't use relative loc, then the worker can't really rely on ser dag to know where a dag is located.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make the subdir agree with configured dags folder. e.g. setting subdir on dag processor command does not force dags_folder to agree. so fundamentally, they are two different things right? the dag processor can run without them agreeing, no?

Right, there is no logic/check, we just pass --subdir to the Dag Processor and it parses the dags from this directory.

if you can't use relative loc, then the worker can't really rely on ser dag to know where a dag is located.

Well, if you define different set of workers on different task queues then it may work, but I agree there is no support from Airflow perspective for such setup.

But note that dag processor seperation (and multiple dag processors) is just the first step towards Airflow multit-tenancy. At some point workers isolation will also be supported.

cc: @potiuk

Copy link
Member

@potiuk potiuk Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the thread carefuly (sorry it took me some time, I wanted to make sure I am not mixing things). There are some misunderstanding of what --sub is, I will try to paraphrase my understanding and simply see if we are on the same page.

A bit context as I understand it and remember :).

The idea of the "--subdir" flag and the way we've implemented it was not to over-complicate it - i.e. to be able to do two things

Phase 1) separate out standalone DAG processor from scheduler (isolate code execution to a different machine that for example might be in a different security zone).

Phase 2) to be able to split your DAGS folder into separate subdirs - each isolated from each other.

Case 1)you have all DAG File processors to run from DAGS_FOLDER (no --subdir flag, equivalent to what we currently have when we hae multiple scheduler all parsing data from the same folder - which is a bit redundant but acceptable. This was not the main purpose of introducing standalone DAG processor to run several of them at the same time)

Case 2) You have one (or more) separate DAG processors and each group of those with its own, separte directory to process. No "top level" dag file processor. This is equivalent to have several independent DAG File processors and no "common" one at all. Eventually this might be connected to teams. The main use-case here was to make it independent from the actual "airflow" CORE_DAGS_FOLDER

TEAM_A -> DAG_PROCESSOR_A1, DAG_PROCESSOR_A2 (--subdir /FOLDER_OF_TEAM_A)
TEAM_B -> DAG_PROCESSOR_B1, DAG_PROCESSOR_B2 (--subdir /FOLDER_OF_TEAM_B)

This means that the user might configure it differently depending on the deployment choice they want to have. And yeah the --subdir name might be misleading in this context, I vaguely recall we had a discussion about it when we chose the parameter name (we can find it I guess to get more context - I can't even remember what was my preference there) but the --subdir in this case is more about the fact that this is not "all of" DAGs but subset of them. I think we chose the name because we chose to use existing --subdir parameter in local which has already this "mistleading" semantics - and so that airflow dags test --subdir <ABSOLUTE_PATH>, and have it equivalent to dag-processor --subdir <ABSOLUTE_PATH>

Currently --subdir in local tasks has this behaviour:

  1. If it is an absolute PATH use it as the absolute location of the DAG
  2. If it has DAGS_FOLDER (anywhere)-> replace DAGS_FOLDER with the current DAGS_FOLDER - this was fixed by @ashb in 2.2.0 to handle the case where we could handle different locations on the scheduler and runners (Support DAGS folder being in different location on scheduler and runners #16860) - it was broken in 2.0 and 2.1 but worked in 1.10.
  3. It can also expand the "user" path (god knows why but likely historical too).

This is the current process_subdir() method:

def process_subdir(subdir: str | None):
    """Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
    if subdir:
        if not settings.DAGS_FOLDER:
            raise ValueError("DAGS_FOLDER variable in settings should be filled.")
        subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
        subdir = os.path.abspath(os.path.expanduser(subdir))
    return subdir

Originally the idea was that they might want to have them independent of the the current https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder without relative paths. That's fine, notthing prevents us from doing so, I believe we handle this perfectly well - we already handled the situation where DagBag is passed an Absolute Path and it would store the absolute path of the DAG where it comes from. This is for example used in case of "example_dags" and this is what we wanted to get.

I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the #16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor.

This is fine - we did not want to do it originally, we have not thought this might be useful, but when I think of it can have some uses - but only of you create a deployment wher all the subdirs served by all file processors are subdirs of the "CORE" dags folder (and there is still no top-level parser/processor).

Then you could likely synchronise all your sub-folders as a single volume or git-sync and scam each sub-folder of it through different group of dag file processors. Originally the idea was that each DAG File processor (group) could have a separate git-sync or volume. But this is an interesting approach where they are isolated for execution but not isolated for syncing. This makes it a bit more difficult to maintain the isolation and syncing (because you have to have parallel filesystem access hierarchy to sub-processor hierarchy or git-submodules to run syncing from multple reposiotories (but it's doable and a number of our users already do it - for example Jagex in their airlfow Summit talk from London).

Are we on the same page here :) ?

Copy link
Contributor Author

@dstandish dstandish Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we on the same page here :) ?

maybe sort of? :) i'm not sure what you're proposing though...

taking it back down to where the rubber meets the road here... what i'm trying to do here is fix a bug. currently if the DAGS_FOLDER of scheduler/processor differs from that of k8s executor then all tasks will fail to run. i think this issue is the same one you say worked in 1.10 but was broken in 2.0 and 2.1 -- it's broken again in 2.4 and that's what i'm trying to fix.

this is easy to repro if you run scheduler in local virtualenv and run your k8s pods in docker desktop

the cause is the recent optimization to not "process" dags in airflow tasks run --local but to read from ser dag. the pod definition remains like this

  - args:
    - airflow
    - tasks
    - run
    - example_bash_operator
    - runme_0
    - manual__2022-09-23T20:04:52.306715+00:00
    - --local
    - --subdir
    - DAGS_FOLDER

But when that gets converted to tasks run --raw then the subdir becomes the full path (fileloc) of the dag as it is located on the scheduler/processor. I think it breaks down because, our mechanism for handling different dags folders is relative_fileloc. But currently the logic to calculate relative fileloc is based on the current dags folder, which means, if the current process has a different dags folder, it cannot be calculated! So then it defaults to the full filepath as stored in serdag and therefore it fails to find the dag.

So what do we do about it? The relatively straightforward solution is to use the dags_folder of serializer instead of dags_folder of current process when calculating relative fileloc (or equivalently, you could actually store the relative fileloc when serializing). Then the actual path can always be reassembled, provided that the dag actually is relative to the dags_folder in all envs (and has the same relative path).

So that's why I added dag_processor_dags_folder.

But @mhenc is suggesting, I think, to just use processor_subdir instead. But the problem I see with that is, processor_subdir may not always equal dags_folder. And in the case of multiple processors in subdirs of dags folder, then this would not work I think. The only way it would work is if you had a 1-1 relationship between dags_folder of worker and processor_subdir of processor. But in that scenario, why not just set the DAGS_FOLDER of your processor to be the subdir?

Fundamentally, I think that currently airflow assumes everything must be in the dags folder, and that dag processors, to the extent they have different paths (subdir) they are subdirs of the dags folder. This alone seems QED for the notion that processor_subdir !== dags_folder. I think as we get closer to multitenancy, it seems likely we'll need to blow up the dags_folder concept; if it's no longer true that there is one dags folder then the code and language need to reflect that. There may be many dags folders. "relative_fileloc" won't necessarily make sense as a concept (unless it's redefined to mean relative to the dags folder of this "sub-instance"). But for now, the two (processor_subdir) and dags folder seem to be two different things, and DAGS_FOLDER seems to have the best likelihood of giving a reliable answer for relative_fileloc so it seems worth storing separately.

THAT SAID, perhaps we should make it a "private" attribute, so as not to increase the backcompat surface unnecessarily?

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk

I believe (please correct me if I misundderstood it @dstandish) that this PR here was very similar to what the #16860 change for @ashb was. But while Ashb wanted to resurrect the relative approach working for the main DAGS folder, but this one was to bring it to the standalone DAG processor.

Not quite.... the "change" here is not really a change, it's fixing the regression in 2.4 of that same exact feature. It's not specific to dag processor. This issue appears even just using scheduler with virtualenv and k8s exec --- no dags processor process required.

@dstandish dstandish force-pushed the store-relative-loc-in-db branch 2 times, most recently from ebfb70e to 9f96aa4 Compare September 27, 2022 15:27
@dstandish
Copy link
Contributor Author

OK so... i made it so this is just a private attr on ser dag object...

So really this just fixes the existing feature without adding any new backcompat promises.

And it is called dags folder not processor subdir, but, as has been established, the two are not the same thing, and the present attr is always going to tell the truth... so... @potiuk @mhenc does it get your blessing?

Currently, if your dags_folder differs from that used by the dag processor, then task_run --local will fail to find the dag, because it uses the fileloc value stored in the serialized dag.  The behavior of this function is somewhat difficult to test so I wanted to first just test the bad behavior and next I'll work on a fix and verify that it behaves correctly.
Currently the code uses the dags_folder of the "current" process to calculate the relative fileloc from the full fileloc stored in the serialized dag.  If the current dags folder is different from the dags folder configured on the dag processor, then airflow will just use the full path, which in this case will be a bad path.  We can fix this by keeping track of the dags_folder from the dag processor that serialized the dag, and using this for figuring out the relative path.
@mhenc
Copy link
Collaborator

mhenc commented Sep 28, 2022

Yes, looks good for me. Thank you!

@potiuk
Copy link
Member

potiuk commented Sep 28, 2022

Makes sense - one static check to fix but good to go :)

@dstandish dstandish merged commit c94f978 into apache:main Sep 28, 2022
@dstandish dstandish deleted the store-relative-loc-in-db branch September 28, 2022 22:56
@jedcunningham jedcunningham added this to the Airflow 2.4.2 milestone Sep 29, 2022
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Sep 29, 2022
ephraimbuddy pushed a commit that referenced this pull request Oct 18, 2022
…rocessor (#26509)

Previously the code used the dags_folder of the "current" process (e.g. the celery worker, or k8s executor worker pod) to calculate the relative fileloc based on the full fileloc stored in the serialized dag.  But if the worker dags_folder folder is different from the dags folder configured on the dag processor, then airflow can't calculate the relative path, so it will just use the full path, which in this case will be a bad path.  We can fix this by keeping track of the dags_folder from the dag processor that serialized the dag, and using this for figuring out the relative path.

(cherry picked from commit c94f978)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants