Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic task adding mapped instance #31292

Closed
1 of 2 tasks
mfukushima3 opened this issue May 15, 2023 · 11 comments
Closed
1 of 2 tasks

Dynamic task adding mapped instance #31292

mfukushima3 opened this issue May 15, 2023 · 11 comments
Labels
affected_version:2.4 Issues Reported for 2.4 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@mfukushima3
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Airflow version in Cloud Composer: composer-1.19.13-airflow-2.3.3

I have two DAGs for reading Stripe data that are exactly the same except for their scheduling intervals. First, they get a list of all the accounts in the system. Then, they use that list for a dynamic task to get other Stripe data via connected accounts (for this example, balance transactions).

The first DAG runs weekly and has catchup enabled going back to 2022-11-01. This DAG runs without issues. The second DAG is a copy of the first with two changes: it runs every 15 min and the catchup was only to the start of the current day.

On some runs of the second (15 min) DAG, the dynamic task adds a mapped instance with an empty map index. The scheduler crashes which causes the entire environment to go down. The log gives this error:
DETAIL: Key (dag_id, task_id, run_id, map_index)=(<DAG name>, <task name>, scheduled__2023-05-12T19:00:00+00:00, 0) already exists.

This error does not occur in an identical DAG that runs weekly.

What you think should happen instead

The task should have one fewer mapped instance than is showing.

How to reproduce

import stripe
from airflow.decorators import task
from airflow.operators.python import get_current_context
import logging

stripe.api_key = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"

@task()
def get_balance_transactions(account_id):
    context = get_current_context()
    dag_run_id = context["run_id"]
    interval_start = context["data_interval_start"]
    interval_end = context["data_interval_end"]

    created_dict = {"gte": interval_start, "lt": interval_end}
    logging.info(created_dict)

    response = stripe.BalanceTransaction.list(stripe_account=account_id, created=created_dict)

    transactions = []
    for transaction in response.auto_paging_iter():
        transaction_dict = transaction.to_dict()
        transaction_dict.update({"account_id": account_id})
        transactions.append(transaction_dict)
        
    return transactions
        

@task()
def list_of_accounts():
    response = stripe.Account.list()

    accounts = []
    for account in response.auto_paging_iter():
        account_id = account.to_dict()["id"]
        accounts.append(account_id)

    return accounts

default_args = {
    "owner": "owner",
    "start_date": datetime(2023, 5, 11),
}
dag_args = {
    "default_args": default_args,
    "description": "Stripe Payment Intents Events Ingestion",
    "schedule_interval": "*/15 * * * *",
    "catchup": True,
    "max_active_runs": 1
    "tags": ["stripe", "payment_intents", "events"],
}


@dag(**dag_args)
def stripe_payment_intents_cdc():
    accounts = tasks.list_of_accounts.override(task_id="get_accounts_list")()

    get_balance_transactions = tasks.get_balance_transactions.partial().expand(
        account_id=accounts["return_value"]
    )
    
    accounts >> get_balance_transactions

Operating System

macOS

Versions of Apache Airflow Providers

composer-1.19.13-airflow-2.3.3

Deployment

Google Cloud Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@mfukushima3 mfukushima3 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 15, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented May 15, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@rohan472000
Copy link
Contributor

you can modify your second DAG to ensure that the mapped instances are created correctly. Here's a suggestion on how to update your code:

  1. Modify the get_balance_transactions task:

    Add a log statement to verify the values of interval_start and interval_end in the task.

  2. Update the mapping of instances in the second DAG:

    Instead of using expand() to map instances in the dynamic task, use the xcom_push() method to push the list of
    accounts as an XCom value.

    Retrieve the list of accounts in the get_balance_transactions task using ti.xcom_pull().

@hussein-awala
Copy link
Member

It can be related to #25060.

We need some information about the failed dag run in order to help you debug the problem:

  • can you provide the dag run start_date? (is it 2023-05-12T19:00:00+00:00 as mentioned in the error?)
  • regarding the failed task, did it fail during the first retry, or were there multiple retries before it failed?

@mfukushima3
Copy link
Author

@hussein-awala
The DAG run started at 2023-05-12, 23:10:11 UTC. The data interval for the data was

Start: 2023-05-12, 18:45:00 UTC
End: 2023-05-12, 19:00:00 UTC

However, this issue has happened multiple times during random runs.

It failed on the first retry.

@d2015196
Copy link

d2015196 commented Jul 13, 2023

We have observed the same issue actually! Here are details of ours:

Apache Airflow version
Other Airflow 2 version (please specify below)

Deployment
Google Cloud Composer (composer-1.20.11-airflow-2.4.3)

What happened
Our scheduler went down. Upon prying on the logs we noticed it went down due to this error:

Failed to execute task (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey.
Key (dag_id, task_id, run_id, map_index)=(<dag_name>, <task_name>, scheduled__2023-07-12T05:20:00+00:00, 0) already exists.

Full logs: composer_masked_logs.txt

Our DAG which has a dynamic task map. Like OP it seems like the index is being incorrectly assigned. This DAG also like OP's, runs rather frequently, 6 times or so per day. It launches 4 -10 dynamically mapped tasks.

The strange thing about this is that this error is observed a few times, yet most of the time this DAG runs smoothly, no errors.

What you think should happen instead
The task should be assigned a map index that has not already been assigned.

How to reproduce
I can't share company code, so I am finding best way to reproduce. Attached are logs.

@eladkal eladkal added area:dynamic-task-mapping AIP-42 affected_version:2.4 Issues Reported for 2.4 labels Aug 7, 2023
@eladkal
Copy link
Contributor

eladkal commented Aug 7, 2023

I am not sure if we have enough information to investigate the root cause
cc @uranusjr

@uranusjr
Copy link
Member

uranusjr commented Aug 7, 2023

The issue reported at the top (against 2.3.3) definitely looks identical to #25060. The one below (against 2.4.3) contains too little information to reasonably digest. I would suggest we close this and move the 2.4.3 report to another issue, and clarify there instead.

@eladkal
Copy link
Contributor

eladkal commented Aug 7, 2023

Thanks!

Closing as completed.

@d2015196 please open a new issue with full description of the problem and reproduce example otherwise we can't investigate.

@eladkal eladkal closed this as completed Aug 7, 2023
@d2015196
Copy link

d2015196 commented Aug 8, 2023

@eladkal will try to reproduce on my environment and open new issue.

@saurabhkhera
Copy link

saurabhkhera commented Dec 19, 2023

@eladkal @d2015196 we are facing the same problem for airflow version 2.4.3, is this issue resolved in any of the further version of airflow?

@potiuk
Copy link
Member

potiuk commented Dec 19, 2023

@eladkal @d2015196 we are facing the same problem for airflow version 2.4.3, is this issue resolved in any of the further version of airflow?

Before you open a new issue with your logs and circumstances, I am afraid we will not even know if this is the same issue. Our state of knowledge have not changed since, and without you providing evidences and details of what you experience it's simply impossible to answer your question.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.4 Issues Reported for 2.4 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

8 participants