Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable umask to all deamonized processes. #25664

Merged
merged 2 commits into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/cli/cli_parser.py
Expand Up @@ -671,7 +671,6 @@ def string_lower_type(val):
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help needs updating?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently -- it's only used for the celery worker command -- the other commands don't have a --umask argument, they only use the config value.

default=conf.get('celery', 'worker_umask'),
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
Expand Down
3 changes: 3 additions & 0 deletions airflow/cli/commands/celery_command.py
Expand Up @@ -75,6 +75,7 @@ def flower(args):
pidfile=TimeoutPIDLockFile(pidfile, -1),
stdout=stdout,
stderr=stderr,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
celery_app.start(options)
Expand Down Expand Up @@ -183,6 +184,8 @@ def worker(args):
with open(stdout, 'a') as stdout_handle, open(stderr, 'a') as stderr_handle:
if args.umask:
umask = args.umask
else:
umask = conf.get('celery', 'worker_umask', fallback=settings.DAEMON_UMASK)

stdout_handle.truncate(0)
stderr_handle.truncate(0)
Expand Down
2 changes: 2 additions & 0 deletions airflow/cli/commands/dag_processor_command.py
Expand Up @@ -22,6 +22,7 @@
import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import settings
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.utils import cli as cli_utils
Expand Down Expand Up @@ -69,6 +70,7 @@ def dag_processor(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
try:
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/kerberos_command.py
Expand Up @@ -42,6 +42,7 @@ def kerberos(args):
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)

with ctx:
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/scheduler_command.py
Expand Up @@ -74,6 +74,7 @@ def scheduler(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
_run_scheduler_job(args=args)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/triggerer_command.py
Expand Up @@ -48,6 +48,7 @@ def triggerer(args):
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
job.run()
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/webserver_command.py
Expand Up @@ -458,6 +458,7 @@ def monitor_gunicorn(gunicorn_master_pid: int):
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
subprocess.Popen(run_args, close_fds=True)
Expand Down
22 changes: 13 additions & 9 deletions airflow/config_templates/config.yml
Expand Up @@ -374,6 +374,19 @@
example: ~
default: "1024"

- name: daemon_umask
description: |
The default umask to use for process when run in daemon mode (scheduler, worker, etc.)

This controls the file-creation mode mask which determines the initial value of file permission bits
for newly created files.

This value is treated as an octal-integer.
version_added: 2.3.4
type: string
default: "0o077"
example: ~

- name: database
description: ~
options:
Expand Down Expand Up @@ -1652,15 +1665,6 @@
type: boolean
example: ~
default: "true"
- name: worker_umask
description: |
Umask that will be used when starting workers with the ``airflow celery worker``
in daemon mode. This control the file-creation mode mask which determines the initial
value of file permission bits for newly created files.
version_added: 2.0.0
type: string
example: ~
default: "0o077"
- name: broker_url
description: |
The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
Expand Down
13 changes: 8 additions & 5 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -212,6 +212,14 @@ default_pool_task_slot_count = 128
# mapped tasks from clogging the scheduler.
max_map_length = 1024

# The default umask to use for process when run in daemon mode (scheduler, worker, etc.)
#
# This controls the file-creation mode mask which determines the initial value of file permission bits
# for newly created files.
#
# This value is treated as an octal-integer.
daemon_umask = 0o077

[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
Expand Down Expand Up @@ -834,11 +842,6 @@ worker_prefetch_multiplier = 1
# prevent this by setting this to false. However, with this disabled Flower won't work.
worker_enable_remote_control = true

# Umask that will be used when starting workers with the ``airflow celery worker``
# in daemon mode. This control the file-creation mode mask which determines the initial
# value of file permission bits for newly created files.
worker_umask = 0o077

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more information.
broker_url = redis://redis:6379/0
Expand Down
10 changes: 10 additions & 0 deletions airflow/configuration.py
Expand Up @@ -522,7 +522,17 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
raise ValueError(f"The value {section}/{key} should be set!")
return value

@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]

...

@overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override]

...

def get(self, section: str, key: str, **kwargs) -> Optional[str]: # type: ignore[override, misc]
section = str(section).lower()
key = str(key).lower()

Expand Down
2 changes: 2 additions & 0 deletions airflow/settings.py
Expand Up @@ -640,3 +640,5 @@ def initialize():

# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get('core', 'daemon_umask', fallback='0o077')
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions tests/cli/commands/test_celery_command.py
Expand Up @@ -354,6 +354,7 @@ def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locat
pidfile=mock_pid_file.return_value,
stderr=mock_open.return_value,
stdout=mock_open.return_value,
umask=0o077,
),
mock.call.DaemonContext().__enter__(),
mock.call.DaemonContext().__exit__(None, None, None),
Expand Down
1 change: 1 addition & 0 deletions tests/cli/commands/test_kerberos_command.py
Expand Up @@ -75,6 +75,7 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m
pidfile=mock_pid_file.return_value,
stderr=mock_open.return_value,
stdout=mock_open.return_value,
umask=0o077,
),
mock.call.DaemonContext().__enter__(),
mock.call.DaemonContext().__exit__(None, None, None),
Expand Down