diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 060b3209a9a3d..0e252ad5bb35b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -225,6 +225,8 @@ class DAG(LoggingMixin): :type tags: List[str] """ + # pylint: disable=too-many-instance-attributes,too-many-public-methods + _comps = { 'dag_id', 'task_ids', @@ -268,9 +270,10 @@ def __init__( jinja_environment_kwargs: Optional[Dict] = None, render_template_as_native_obj: bool = False, tags: Optional[List[str]] = None, - ): + ): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements from airflow.utils.task_group import TaskGroup + super().__init__() self.user_defined_macros = user_defined_macros self.user_defined_filters = user_defined_filters self.default_args = copy.deepcopy(default_args or {}) @@ -385,7 +388,7 @@ def __repr__(self): return f"" def __eq__(self, other): - if type(self) == type(other): + if type(self) == type(other): # pylint: disable=unidiomatic-typecheck # Use getattr() instead of __dict__ as __dict__ doesn't return # correct values for properties. return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps) @@ -399,12 +402,12 @@ def __lt__(self, other): def __hash__(self): hash_components = [type(self)] - for c in self._comps: + for comp in self._comps: # task_ids returns a list and lists can't be hashed - if c == 'task_ids': + if comp == 'task_ids': val = tuple(self.task_dict.keys()) else: - val = getattr(self, c, None) + val = getattr(self, comp, None) try: hash(val) hash_components.append(val) @@ -455,6 +458,7 @@ def date_range( num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow(), ) -> List[datetime]: + """Get a set of dates as a list based on a start, end and num""" if num is not None: end_date = None return utils_date_range( @@ -500,11 +504,12 @@ def following_schedule(self, dttm): else: # absolute (e.g. 3 AM) naive = cron.get_next(datetime) - tz = self.timezone + tz = self.timezone # pylint: disable=invalid-name following = timezone.make_aware(naive, tz) return timezone.convert_to_utc(following) elif self.normalized_schedule_interval is not None: return timezone.convert_to_utc(dttm + self.normalized_schedule_interval) + return None def previous_schedule(self, dttm): """ @@ -528,11 +533,12 @@ def previous_schedule(self, dttm): else: # absolute (e.g. 3 AM) naive = cron.get_prev(datetime) - tz = self.timezone + tz = self.timezone # pylint: disable=invalid-name previous = timezone.make_aware(naive, tz) return timezone.convert_to_utc(previous) elif self.normalized_schedule_interval is not None: return timezone.convert_to_utc(dttm - self.normalized_schedule_interval) + return None def next_dagrun_info( self, @@ -680,12 +686,14 @@ def normalize_schedule(self, dttm): @provide_session def get_last_dagrun(self, session=None, include_externally_triggered=False): + """Get the latest dagrun of this dag""" return get_last_dagrun( self.dag_id, session=session, include_externally_triggered=include_externally_triggered ) @provide_session def has_dag_runs(self, session=None, include_externally_triggered=True) -> bool: + """Check whether this dag has dagrun""" return ( get_last_dagrun( self.dag_id, session=session, include_externally_triggered=include_externally_triggered @@ -694,23 +702,24 @@ def has_dag_runs(self, session=None, include_externally_triggered=True) -> bool: ) @property - def dag_id(self) -> str: + def dag_id(self) -> str: # pylint: disable=missing-function-docstring return self._dag_id @dag_id.setter - def dag_id(self, value: str) -> None: + def dag_id(self, value: str) -> None: # pylint: disable=missing-function-docstring self._dag_id = value @property def full_filepath(self) -> str: + """Dag location""" return self._full_filepath @full_filepath.setter - def full_filepath(self, value) -> None: + def full_filepath(self, value) -> None: # pylint: disable=missing-function-docstring self._full_filepath = value @property - def concurrency(self) -> int: + def concurrency(self) -> int: # pylint: disable=missing-function-docstring # TODO: Remove in Airflow 3.0 warnings.warn( "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.", @@ -720,39 +729,51 @@ def concurrency(self) -> int: return self._max_active_tasks @concurrency.setter - def concurrency(self, value: int): + def concurrency(self, value: int): # pylint: disable=missing-function-docstring self._max_active_tasks = value @property def max_active_tasks(self) -> int: + """ + Maximum number of running task instances, + if the num of running task instances beyond this limitation, + the scheduler won't queue the task instance, and the worker won't execute it too + """ return self._max_active_tasks @max_active_tasks.setter def max_active_tasks(self, value: int): + """Set max active tasks limit""" self._max_active_tasks = value @property def access_control(self): + """Get specified DAG-level actions, e.g., + "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}""" return self._access_control @access_control.setter - def access_control(self, value): + def access_control(self, value): # pylint: disable=missing-function-docstring self._access_control = DAG._upgrade_outdated_dag_access_control(value) @property - def description(self) -> Optional[str]: + def description(self) -> Optional[str]: # pylint: disable=missing-function-docstring return self._description @property def default_view(self) -> str: + """ + The default DAG View + coule be one of tree, graph, duration, gantt, landing_times + """ return self._default_view @property - def pickle_id(self) -> Optional[int]: + def pickle_id(self) -> Optional[int]: # pylint: disable=missing-function-docstring return self._pickle_id @pickle_id.setter - def pickle_id(self, value: int) -> None: + def pickle_id(self, value: int) -> None: # pylint: disable=missing-function-docstring self._pickle_id = value def param(self, name: str, default=None) -> DagParam: @@ -767,26 +788,27 @@ def param(self, name: str, default=None) -> DagParam: @property def tasks(self) -> List[BaseOperator]: + """Get tasks of this dag as a list""" return list(self.task_dict.values()) @tasks.setter - def tasks(self, val): + def tasks(self, val): # pylint: disable=missing-function-docstring raise AttributeError('DAG.tasks can not be modified. Use dag.add_task() instead.') @property - def task_ids(self) -> List[str]: + def task_ids(self) -> List[str]: # pylint: disable=missing-function-docstring return list(self.task_dict.keys()) @property def task_group(self) -> "TaskGroup": + """A collection of related tasks""" return self._task_group @property def filepath(self) -> str: """File location of where the dag object is instantiated""" - fn = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '') - fn = fn.replace(os.path.dirname(__file__) + '/', '') - return fn + filepath = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '') + return filepath.replace(os.path.dirname(__file__) + '/', '') @property def folder(self) -> str: @@ -805,6 +827,7 @@ def owner(self) -> str: @property def allow_future_exec_dates(self) -> bool: + """Whether allow to get future task instances or not""" return settings.ALLOW_FUTURE_EXEC_DATES and self.schedule_interval is None @provide_session @@ -896,7 +919,7 @@ def handle_callback(self, dagrun, success=True, reason=None, session=None): context.update({'reason': reason}) try: callback(context) - except Exception: + except Exception: # pylint: disable=broad-except self.log.exception("failed to invoke dag state update callback") Stats.incr("dag.callback_exceptions") @@ -925,6 +948,7 @@ def get_num_active_runs(self, external_trigger=None, session=None): :return: number greater than 0 for active dag runs """ # .count() is inefficient + # pylint: disable=comparison-with-callable query = ( session.query(func.count()) .filter(DagRun.dag_id == self.dag_id) @@ -1020,8 +1044,9 @@ def subdags(self): return subdag_lst def resolve_template_files(self): - for t in self.tasks: - t.resolve_template_files() + """Getting the content of files for template_field / template_ext""" + for task in self.tasks: + task.resolve_template_files() def get_template_env(self) -> jinja2.Environment: """Build a Jinja2 environment.""" @@ -1064,6 +1089,7 @@ def set_dependency(self, upstream_task_id, downstream_task_id): def get_task_instances( self, start_date=None, end_date=None, state=None, session=None ) -> Iterable[TaskInstance]: + """Get task instances of the dag based on start_date, end_date, state""" if not start_date: start_date = (timezone.utcnow() - timedelta(30)).date() start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time())) @@ -1096,7 +1122,7 @@ def _get_task_instances( include_subdags: bool, include_parentdag: bool, include_dependent_dags: bool, - exclude_task_ids: Collection[str], + exclude_task_ids: Collection[str], # pylint: disable=unsubscriptable-object as_pk_tuple: Literal[True], session: Session, dag_bag: "DagBag" = None, @@ -1118,7 +1144,7 @@ def _get_task_instances( include_parentdag: bool, include_dependent_dags: bool, as_pk_tuple: Literal[False], - exclude_task_ids: Collection[str], + exclude_task_ids: Collection[str], # pylint: disable=unsubscriptable-object session: Session, dag_bag: "DagBag" = None, recursion_depth: int = 0, @@ -1127,7 +1153,7 @@ def _get_task_instances( ) -> Iterable[TaskInstance]: ... # pragma: no cover - def _get_task_instances( + def _get_task_instances( # pylint: disable=too-many-locals,too-many-branches,too-many-statements self, *, task_ids, @@ -1138,7 +1164,7 @@ def _get_task_instances( include_parentdag: bool, include_dependent_dags: bool, as_pk_tuple: bool, - exclude_task_ids: Collection[str], + exclude_task_ids: Collection[str], # pylint: disable=unsubscriptable-object session: Session, dag_bag: "DagBag" = None, recursion_depth: int = 0, @@ -1164,9 +1190,11 @@ def _get_task_instances( if include_subdags: # Crafting the right filter for dag_id and task_ids combo conditions = [] - for dag in self.subdags + [self]: + for dag in self.subdags + [self]: # pylint: disable=redefined-outer-name conditions.append( + # pylint: disable=no-member (TaskInstance.dag_id == dag.dag_id) & TaskInstance.task_id.in_(dag.task_ids) + # pylint: enable=no-member ) tis = tis.filter(or_(*conditions)) else: @@ -1181,7 +1209,7 @@ def _get_task_instances( end_date = end_date or timezone.utcnow() tis = tis.filter(TaskInstance.execution_date <= end_date) - if state: + if state: # pylint: disable=too-many-nested-blocks if isinstance(state, str): tis = tis.filter(TaskInstance.state == state) elif len(state) == 1: @@ -1207,7 +1235,7 @@ def _get_task_instances( include_downstream=True, ) result.update( - p_dag._get_task_instances( + p_dag._get_task_instances( # pylint: disable=protected-access task_ids=task_ids, start_date=start_date, end_date=end_date, @@ -1225,7 +1253,7 @@ def _get_task_instances( ) ) - if include_dependent_dags: + if include_dependent_dags: # pylint: disable=too-many-nested-blocks # Recursively find external tasks indicated by ExternalTaskMarker from airflow.sensors.external_task import ExternalTaskMarker @@ -1281,7 +1309,7 @@ def _get_task_instances( include_downstream=True, ) result.update( - downstream._get_task_instances( + downstream._get_task_instances( # pylint: disable=protected-access task_ids=None, start_date=tii.execution_date, end_date=tii.execution_date, @@ -1359,7 +1387,7 @@ def topological_sort(self, include_subdag_tasks: bool = False): return tuple(graph_sorted) # Run until the unsorted graph is empty. - while graph_unsorted: + while graph_unsorted: # pylint: disable=too-many-nested-blocks # Go through each of the node/edges pairs in the unsorted # graph. If a set of edges doesn't contain any nodes that # haven't been resolved, that is, that are still in the @@ -1399,6 +1427,7 @@ def set_dag_runs_state( end_date: Optional[datetime] = None, dag_ids: List[str] = None, ) -> None: + """Update the state of dag_run based on dag_ids, start_date and end_date""" warnings.warn( "This method is deprecated and will be removed in a future version.", DeprecationWarning, @@ -1431,7 +1460,7 @@ def clear( max_recursion_depth=None, dag_bag=None, exclude_task_ids: FrozenSet[str] = frozenset({}), - ): + ): # pylint: disable=too-many-arguments,too-many-locals, too-many-branches """ Clears a set of task instances associated with the current dag for a specified date range. @@ -1551,9 +1580,10 @@ def clear_dags( include_parentdag=False, dag_run_state=State.RUNNING, dry_run=False, - ): + ): # pylint: disable=too-many-arguments + """Clear the task instances, and then they can be rescheduled""" all_tis = [] - for dag in dags: + for dag in dags: # pylint: disable=redefined-outer-name tis = dag.clear( start_date=start_date, end_date=end_date, @@ -1611,7 +1641,7 @@ def __deepcopy__(self, memo): result.user_defined_filters = self.user_defined_filters result.params = self.params if hasattr(self, '_log'): - result._log = self._log + result._log = self._log # pylint: disable=attribute-defined-outside-init return result def sub_dag(self, *args, **kwargs): @@ -1646,7 +1676,9 @@ def partial_subset( # deep-copying self.task_dict and self._task_group takes a long time, and we don't want all # the tasks anyway, so we copy the tasks manually later memo = {id(self.task_dict): None, id(self._task_group): None} + # pylint: disable=redefined-outer-name dag = copy.deepcopy(self, memo) # type: ignore + # pylint: enable=redefined-outer-name if isinstance(task_ids_or_regex, (str, RePatternType)): matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)] @@ -1654,13 +1686,13 @@ def partial_subset( matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex] also_include = [] - for t in matched_tasks: + for task in matched_tasks: if include_downstream: - also_include += t.get_flat_relatives(upstream=False) + also_include += task.get_flat_relatives(upstream=False) if include_upstream: - also_include += t.get_flat_relatives(upstream=True) + also_include += task.get_flat_relatives(upstream=True) elif include_direct_upstream: - also_include += t.upstream_list + also_include += task.upstream_list # Compiling the unique list of tasks that made the cut # Make sure to not recursively deepcopy the dag while copying the task @@ -1673,7 +1705,7 @@ def filter_task_group(group, parent_group): """Exclude tasks not included in the subdag from the given TaskGroup.""" copied = copy.copy(group) copied.used_group_ids = set(copied.used_group_ids) - copied._parent_group = parent_group + copied._parent_group = parent_group # pylint: disable=protected-access copied.children = {} @@ -1692,7 +1724,7 @@ def filter_task_group(group, parent_group): return copied - dag._task_group = filter_task_group(self._task_group, None) + dag._task_group = filter_task_group(self._task_group, None) # pylint: disable=protected-access # Removing upstream/downstream references to tasks and TaskGroups that did not make # the cut. @@ -1703,11 +1735,13 @@ def filter_task_group(group, parent_group): group.upstream_task_ids = group.upstream_task_ids.intersection(dag.task_dict.keys()) group.downstream_task_ids = group.downstream_task_ids.intersection(dag.task_dict.keys()) - for t in dag.tasks: + for task in dag.tasks: # Removing upstream/downstream references to tasks that did not # make the cut - t._upstream_task_ids = t.upstream_task_ids.intersection(dag.task_dict.keys()) - t._downstream_task_ids = t.downstream_task_ids.intersection(dag.task_dict.keys()) + # pylint: disable=protected-access + task._upstream_task_ids = task.upstream_task_ids.intersection(dag.task_dict.keys()) + task._downstream_task_ids = task.downstream_task_ids.intersection(dag.task_dict.keys()) + # pylint: enable=protected-access if len(dag.tasks) < len(self.tasks): dag.partial = True @@ -1715,60 +1749,76 @@ def filter_task_group(group, parent_group): return dag def has_task(self, task_id: str): + """Check whether task specified by task_id belongs to this dag""" return task_id in (t.task_id for t in self.tasks) def get_task(self, task_id: str, include_subdags: bool = False) -> BaseOperator: + """ + Get task according to task_id from dag, + if include_subdags is True, search the associated subdags too + """ if task_id in self.task_dict: return self.task_dict[task_id] if include_subdags: - for dag in self.subdags: + for dag in self.subdags: # pylint: disable=redefined-outer-name if task_id in dag.task_dict: return dag.task_dict[task_id] raise TaskNotFound(f"Task {task_id} not found") - def pickle_info(self): - d = {} - d['is_picklable'] = True + def pickle_info(self): # pylint: disable=missing-function-docstring + warnings.warn( + "This method is deprecated and will be removed in a future version.", + DeprecationWarning, + stacklevel=2, + ) + + pickle_info = {} + pickle_info['is_picklable'] = True try: dttm = timezone.utcnow() pickled = pickle.dumps(self) - d['pickle_len'] = len(pickled) - d['pickling_duration'] = str(timezone.utcnow() - dttm) - except Exception as e: + pickle_info['pickle_len'] = len(pickled) + pickle_info['pickling_duration'] = str(timezone.utcnow() - dttm) + except Exception as e: # pylint: disable=broad-except self.log.debug(e) - d['is_picklable'] = False - d['stacktrace'] = traceback.format_exc() - return d + pickle_info['is_picklable'] = False + pickle_info['stacktrace'] = traceback.format_exc() + return pickle_info @provide_session def pickle(self, session=None) -> DagPickle: - dag = session.query(DagModel).filter(DagModel.dag_id == self.dag_id).first() - dp = None + """Get or generate dag pickle""" + dag = ( # pylint: disable=redefined-outer-name + session.query(DagModel).filter(DagModel.dag_id == self.dag_id).first() + ) + dag_pickle = None if dag and dag.pickle_id: - dp = session.query(DagPickle).filter(DagPickle.id == dag.pickle_id).first() - if not dp or dp.pickle != self: - dp = DagPickle(dag=self) - session.add(dp) - self.last_pickled = timezone.utcnow() + dag_pickle = session.query(DagPickle).filter(DagPickle.id == dag.pickle_id).first() + if not dag_pickle or dag_pickle.pickle != self: + dag_pickle = DagPickle(dag=self) + session.add(dag_pickle) + self.last_pickled = timezone.utcnow() # pylint: disable=attribute-defined-outside-init session.commit() - self.pickle_id = dp.id + self.pickle_id = dag_pickle.id - return dp + return dag_pickle def tree_view(self) -> None: """Print an ASCII tree representation of the DAG.""" def get_downstream(task, level=0): + """Print downstream of a task recursively""" print((" " * level * 4) + str(task)) level += 1 - for t in task.downstream_list: - get_downstream(t, level) + for task in task.downstream_list: # pylint: disable=redefined-argument-from-local + get_downstream(task, level) - for t in self.roots: - get_downstream(t) + for task in self.roots: + get_downstream(task) @property def task(self): + """Return a decorator to generate a task belonging to this dag""" from airflow.decorators import task return functools.partial(task, dag=self) @@ -1808,7 +1858,7 @@ def add_task(self, task): # Add task_id to used_group_ids to prevent group_id and task_id collisions. self._task_group.used_group_ids.add(task.task_id) - self.task_count = len(self.task_dict) + self.task_count = len(self.task_dict) # pylint: disable=attribute-defined-outside-init def add_tasks(self, tasks): """ @@ -1833,10 +1883,10 @@ def run( pool=None, delay_on_limit_secs=1.0, verbose=False, - conf=None, + conf=None, # pylint: disable=redefined-outer-name rerun_failed_tasks=False, run_backwards=False, - ): + ): # pylint: disable=too-many-arguments """ Runs the DAG. @@ -1916,12 +1966,12 @@ def create_dagrun( run_id: Optional[str] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = False, - conf: Optional[dict] = None, + conf: Optional[dict] = None, # pylint: disable=redefined-outer-name run_type: Optional[DagRunType] = None, session=None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, - ): + ): # pylint: disable=too-many-arguments """ Creates a dag run from this dag including the tasks associated with this dag. Returns the dag run. @@ -1985,7 +2035,7 @@ def create_dagrun( @classmethod @provide_session - def bulk_sync_to_db(cls, dags: Collection["DAG"], session=None): + def bulk_sync_to_db(cls, dags: Collection["DAG"], session=None): # pylint: disable=unsubscriptable-object """This method is deprecated in favor of bulk_write_to_db""" warnings.warn( "This method is deprecated and will be removed in a future version. Please use bulk_write_to_db", @@ -1996,7 +2046,9 @@ def bulk_sync_to_db(cls, dags: Collection["DAG"], session=None): @classmethod @provide_session - def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): + def bulk_write_to_db( + cls, dags: Collection["DAG"], session=None # pylint: disable=unsubscriptable-object + ): """ Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including calculated fields. @@ -2025,7 +2077,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): for missing_dag_id in missing_dag_ids: orm_dag = DagModel(dag_id=missing_dag_id) - dag = dag_by_ids[missing_dag_id] + dag = dag_by_ids[missing_dag_id] # pylint: disable=redefined-outer-name if dag.is_paused_upon_creation is not None: orm_dag.is_paused = dag.is_paused_upon_creation orm_dag.tags = [] @@ -2088,13 +2140,14 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): if orm_tag.name not in orm_dag.tags: session.delete(orm_tag) orm_dag.tags.remove(orm_tag) - if dag.tags: - orm_tag_names = [t.name for t in orm_dag.tags] - for dag_tag in list(dag.tags): - if dag_tag not in orm_tag_names: - dag_tag_orm = DagTag(name=dag_tag, dag_id=dag.dag_id) - orm_dag.tags.append(dag_tag_orm) - session.add(dag_tag_orm) + if not dag.tags: + continue + orm_tag_names = [t.name for t in orm_dag.tags] + for dag_tag in list(dag.tags): + if dag_tag not in orm_tag_names: + dag_tag_orm = DagTag(name=dag_tag, dag_id=dag.dag_id) + orm_dag.tags.append(dag_tag_orm) + session.add(dag_tag_orm) if settings.STORE_DAG_CODE: DagCode.bulk_sync_to_db([dag.fileloc for dag in orm_dags]) @@ -2137,7 +2190,10 @@ def deactivate_unknown_dags(active_dag_ids, session=None): """ if len(active_dag_ids) == 0: return - for dag in session.query(DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): + + for dag in ( # pylint: disable=redefined-outer-name + session.query(DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all() + ): dag.is_active = False session.merge(dag) session.commit() @@ -2154,7 +2210,7 @@ def deactivate_stale_dags(expiration_date, session=None): :type expiration_date: datetime :return: None """ - for dag in ( + for dag in ( # pylint: disable=redefined-outer-name session.query(DagModel) .filter(DagModel.last_parsed_time < expiration_date, DagModel.is_active) .all() @@ -2261,7 +2317,7 @@ class DagTag(Base): dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True) def __repr__(self): - return self.name + return f"{self.name}" class DagModel(Base): @@ -2344,21 +2400,24 @@ def __repr__(self): return f"" @property - def timezone(self): + def timezone(self): # pylint: disable=missing-function-docstring return settings.TIMEZONE @staticmethod @provide_session def get_dagmodel(dag_id, session=None): + """Get dag model object containing DAG properties from db""" return session.query(DagModel).filter(DagModel.dag_id == dag_id).first() @classmethod @provide_session def get_current(cls, dag_id, session=None): + """Get dag model of from db with dag_id""" return session.query(cls).filter(cls.dag_id == dag_id).first() @provide_session def get_last_dagrun(self, session=None, include_externally_triggered=False): + """Get the latest dag run, None if there was none""" return get_last_dagrun( self.dag_id, session=session, include_externally_triggered=include_externally_triggered ) @@ -2393,6 +2452,7 @@ def get_default_view(self) -> str: @property def safe_dag_id(self): + """The dag id whose . is replaced by __dot__""" return self.dag_id.replace('.', '__dot__') @provide_session @@ -2427,22 +2487,20 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None): log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__) dag_models = session.query(cls).all() - try: - for dag_model in dag_models: - if dag_model.fileloc is not None: - if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs: - dag_model.is_active = False - else: - # If is_active is set as False and the DAG File still exists - # Change is_active=True - if not dag_model.is_active: - dag_model.is_active = True - else: - continue - session.commit() - except Exception: - session.rollback() - raise + # there is already try in provide_session, + # the try here could be removed + for dag_model in dag_models: + if dag_model.fileloc is None: + continue + + if correct_maybe_zipped(dag_model.fileloc) not in alive_dag_filelocs: + dag_model.is_active = False + else: + # If is_active is set as False and the DAG File still exists + # Change is_active=True + if not dag_model.is_active: + dag_model.is_active = True + session.commit() @classmethod def dags_needing_dagruns(cls, session: Session): @@ -2470,7 +2528,10 @@ def dags_needing_dagruns(cls, session: Session): return with_row_locks(query, of=cls, session=session, **skip_locked(session=session)) def calculate_dagrun_date_fields( - self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime], active_runs_of_dag: int + self, + dag: DAG, # pylint: disable=redefined-outer-name + most_recent_dag_run: Optional[pendulum.DateTime], + active_runs_of_dag: int, ) -> None: """ Calculate ``next_dagrun`` and `next_dagrun_create_after`` @@ -2536,7 +2597,7 @@ def factory(*args, **kwargs): f_kwargs[name] = dag_obj.param(name, value) # set file location to caller source path - back = sys._getframe().f_back + back = sys._getframe().f_back # pylint: disable=protected-access dag_obj.fileloc = back.f_code.co_filename if back else "" # Invoke function to create operators in the DAG scope. @@ -2584,13 +2645,15 @@ class DagContext: _previous_context_managed_dags: List[DAG] = [] @classmethod - def push_context_managed_dag(cls, dag: DAG): + def push_context_managed_dag(cls, dag: DAG): # pylint: disable=redefined-outer-name + # pylint: disable=missing-function-docstring if cls._context_managed_dag: cls._previous_context_managed_dags.append(cls._context_managed_dag) cls._context_managed_dag = dag @classmethod def pop_context_managed_dag(cls) -> Optional[DAG]: + # pylint: disable=missing-function-docstring old_dag = cls._context_managed_dag if cls._previous_context_managed_dags: cls._context_managed_dag = cls._previous_context_managed_dags.pop() @@ -2600,4 +2663,5 @@ def pop_context_managed_dag(cls) -> Optional[DAG]: @classmethod def get_current_dag(cls) -> Optional[DAG]: + # pylint: disable=missing-function-docstring return cls._context_managed_dag diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 7a6ef29a98c3c..087d9282c70a4 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -1,4 +1,3 @@ -./airflow/models/dag.py ./airflow/models/dagrun.py ./airflow/www/utils.py ./airflow/configuration.py