Skip to content

Commit

Permalink
Update old style typing (#26872)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun committed Oct 27, 2022
1 parent b757bfa commit 9ab1a6a
Show file tree
Hide file tree
Showing 228 changed files with 396 additions and 1,008 deletions.
3 changes: 1 addition & 2 deletions airflow/api/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from __future__ import annotations

from importlib import import_module
from typing import Any

from airflow import api
from airflow.api.client.api_client import Client
Expand All @@ -28,7 +27,7 @@

def get_current_api_client() -> Client:
"""Return current API Client based on current Airflow configuration"""
api_module = import_module(conf.get_mandatory_value('cli', 'api_client')) # type: Any
api_module = import_module(conf.get_mandatory_value('cli', 'api_client'))
auth_backends = api.load_auth()
session = None
for backend in auth_backends:
Expand Down
3 changes: 0 additions & 3 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ def getsection(self, section: str) -> ConfigOptionsDictType | None:
as required.
:param section: section from the config
:rtype: dict
"""
if not self.has_section(section) and not self.airflow_defaults.has_section(section):
return None
Expand Down Expand Up @@ -921,7 +920,6 @@ def as_dict(
:param include_secret: Should the result of calling any *_secret config be
set (True, default), or should the _secret options be left as the
path to get the secret from (False)
:rtype: Dict[str, Dict[str, str]]
:return: Dictionary, where the key is the name of the section and the content is
the dictionary with the name of the parameter and its value.
"""
Expand Down Expand Up @@ -1087,7 +1085,6 @@ def _filter_by_source(
Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
:param getter_func: A callback function that gets the user configured
override value for a particular sensitive_config_values config.
:rtype: None
:return: None, the given config_sources is filtered if necessary,
otherwise untouched.
"""
Expand Down
31 changes: 11 additions & 20 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,89 +845,80 @@ def _log_file_processing_stats(self, known_file_paths):

self.log.info(log_str)

def get_pid(self, file_path):
def get_pid(self, file_path) -> int | None:
"""
:param file_path: the path to the file that's being processed
:return: the PID of the process processing the given file or None if
the specified file is not being processed
:rtype: int
"""
if file_path in self._processors:
return self._processors[file_path].pid
return None

def get_all_pids(self):
def get_all_pids(self) -> list[int]:
"""
Get all pids.
:return: a list of the PIDs for the processors that are running
:rtype: List[int]
"""
return [x.pid for x in self._processors.values()]

def get_last_runtime(self, file_path):
def get_last_runtime(self, file_path) -> float | None:
"""
:param file_path: the path to the file that was processed
:return: the runtime (in seconds) of the process of the last run, or
None if the file was never processed.
:rtype: float
"""
stat = self._file_stats.get(file_path)
return stat.last_duration.total_seconds() if stat and stat.last_duration else None

def get_last_dag_count(self, file_path):
def get_last_dag_count(self, file_path) -> int | None:
"""
:param file_path: the path to the file that was processed
:return: the number of dags loaded from that file, or None if the file
was never processed.
:rtype: int
"""
stat = self._file_stats.get(file_path)
return stat.num_dags if stat else None

def get_last_error_count(self, file_path):
def get_last_error_count(self, file_path) -> int | None:
"""
:param file_path: the path to the file that was processed
:return: the number of import errors from processing, or None if the file
was never processed.
:rtype: int
"""
stat = self._file_stats.get(file_path)
return stat.import_errors if stat else None

def get_last_finish_time(self, file_path):
def get_last_finish_time(self, file_path) -> datetime | None:
"""
:param file_path: the path to the file that was processed
:return: the finish time of the process of the last run, or None if the
file was never processed.
:rtype: datetime
"""
stat = self._file_stats.get(file_path)
return stat.last_finish_time if stat else None

def get_start_time(self, file_path):
def get_start_time(self, file_path) -> datetime | None:
"""
:param file_path: the path to the file that's being processed
:return: the start time of the process that's processing the
specified file or None if the file is not currently being processed
:rtype: datetime
"""
if file_path in self._processors:
return self._processors[file_path].start_time
return None

def get_run_count(self, file_path):
def get_run_count(self, file_path) -> int:
"""
:param file_path: the path to the file that's being processed
:return: the number of times the given file has been parsed
:rtype: int
"""
stat = self._file_stats.get(file_path)
return stat.run_count if stat else 0

def get_dag_directory(self) -> str:
"""
Returns the dag_director as a string.
:rtype: str
"""
"""Returns the dag_director as a string."""
if isinstance(self._dag_directory, Path):
return str(self._dag_directory.resolve())
else:
Expand Down
19 changes: 3 additions & 16 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def _run_file_processor(
:param thread_name: the name to use for the process that is launched
:param callback_requests: failure callback to execute
:return: the process that was launched
:rtype: multiprocessing.Process
"""
# This helper runs in the newly created process
log: logging.Logger = logging.getLogger("airflow.processor")
Expand Down Expand Up @@ -259,10 +258,7 @@ def _kill_process(self) -> None:

@property
def pid(self) -> int:
"""
:return: the PID of the process launched to process the given file
:rtype: int
"""
"""PID of the process launched to process the given file."""
if self._process is None or self._process.pid is None:
raise AirflowException("Tried to get PID before starting!")
return self._process.pid
Expand All @@ -273,7 +269,6 @@ def exit_code(self) -> int | None:
After the process is finished, this can be called to get the return code
:return: the exit code of the process
:rtype: int
"""
if self._process is None:
raise AirflowException("Tried to get exit code before starting!")
Expand All @@ -287,7 +282,6 @@ def done(self) -> bool:
Check if the process launched to process this file is done.
:return: whether the process is finished running
:rtype: bool
"""
if self._process is None or self._parent_channel is None:
raise AirflowException("Tried to see if it's done before starting!")
Expand Down Expand Up @@ -326,20 +320,14 @@ def done(self) -> bool:

@property
def result(self) -> tuple[int, int] | None:
"""
:return: result of running DagFileProcessor.process_file()
:rtype: tuple[int, int] or None
"""
"""Result of running ``DagFileProcessor.process_file()``."""
if not self.done:
raise AirflowException("Tried to get the result before it's done!")
return self._result

@property
def start_time(self) -> datetime.datetime:
"""
:return: when this started to process the file
:rtype: datetime
"""
"""Time when this started to process the file."""
if self._start_time is None:
raise AirflowException("Tried to get start time before it started!")
return self._start_time
Expand Down Expand Up @@ -751,7 +739,6 @@ def process_file(
save them to the db
:param session: Sqlalchemy ORM Session
:return: number of dags found, count of import errors
:rtype: Tuple[int, int]
"""
self.log.info("Processing file %s for tasks to queue", file_path)

Expand Down

0 comments on commit 9ab1a6a

Please sign in to comment.