Using the output of a HttpOperator as input for a SQLExecuteQueryOperator when it is executed in chunks #39080
-
Dear, But, writing the data from the HttpOperator to the SQLExecuteQueryOperator was not that straightforward as first thought. I tired the following approaches:
This has led me to the only option I could think of: Creating a custom HttpToDBOperator that takes the needed arguments to get the data from HttpOperator and write that dataset to the database with SQLExecuteQueryOperator. Maybe not the most clean approach but it worked, but I am still wondering: Is there an option I did not think of? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 9 replies
-
I understand the issue that Kenny has, as we also encountered that "problem" many times. The thing is that Airflow will always execute one mapped operator before another one, even if it's in a task group. At first when I saw a task group I expected it would execute the operators one after another for each mapped input, but that's not the case. What we would like to achieve is that a mapped task_group for example would execute all operators sequentially in the task_group for each input, and not execute all inputs to the first operator and then apply all outputs of that first operator to execute the second one. What would be even cooler is that those operators could be like chained and executed on the same worker, (thus avoiding the xcom communication between them so that the second operator would be able to use the input of the previous one) -> that last sentence is incorrect we could not avoid the xcoms I think. As a temporary solution I would think of creating a custom SequentialOperator and pass the operators as parameters to that operator so that the sequential operator would then execute those operators, so the execute method of the SequentialOperator would then call the execute methods of the passed operators (I know there is more to it, it's just a simple explanation). "Problem" with this approach is that we would only see the SequentialOperator in the Graph, if it's a problem at all, but it would be nicer if we would have that option build-in in Airflow, like for example adding a boolean parameter sequential (which by default would be False of course) to the task_group and then the TaskGroup would know it has the execute all operators sequentially for each mapped task. I'm willing to contribute for this but I'm not sure where I have to check this and if this would be accepted by the community as a possible feature? I've already looked at the TaskGroup class but don't know how I could achieve this logic, I suppose there is more to it than just the TaskGroup. Also started a thread for this on slack. |
Beta Was this translation helpful? Give feedback.
-
Could you provide some reproducible code? I believe that dynamic mapped task group is what your looking for. Here are some of my comments on @dabla slack thread
I invite you to check the confluence page where we discussed the dynamic mapped task group feature. |
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
The scope of my question existed out of following steps:
The issue I had:
While I kept on trying to get my use-case to work I also tested the pointers from this discussion, but I got it working on my own approach:
Above screenshot shows: